Re: cache changes precision

2014-07-24 Thread Ron Gonzalez
Cool I'll take a look and give it a try!

Thanks,
Ron

Sent from my iPad

> On Jul 24, 2014, at 10:35 PM, Andrew Ash  wrote:
> 
> Hi Ron,
> 
> I think you're encountering the issue where cacheing data from Hadoop ends up 
> with many duplicate values instead of what you expect.  Try adding a .clone() 
> to the datum() call.
> 
> The issue is that Hadoop returns the same object many times but with its 
> contents changed.  This is an optimization to prevent allocating and GC'ing 
> an object for every row in Hadoop.  This works fine in Hadoop MapReduce 
> because it's single-threaded and with no cacheing of the objects.
> 
> Spark though saves a reference to each object it gets back from Hadoop.  So 
> by the end of the partition, Spark ends up with a bunch of references all to 
> the same object!  I think it's just by chance that this ends up changing your 
> average to be rounded.
> 
> Can you try with cloning the records in the map call?  Also look at the 
> contents and see if they're actually changed, or if the resulting RDD after a 
> cache is just the last record "smeared" across all the others.
> 
> Cheers,
> Andrew
> 
> 
>> On Thu, Jul 24, 2014 at 2:41 PM, Ron Gonzalez  wrote:
>> Hi,
>>   I'm doing the following:
>> 
>>   def main(args: Array[String]) = {
>> val sparkConf = new 
>> SparkConf().setAppName("AvroTest").setMaster("local[2]")
>> val sc = new SparkContext(sparkConf)
>> val conf = new Configuration()
>> val job = new Job(conf)
>> val path = new Path("/tmp/a.avro");
>> val schema = AvroUtils.getSchema(conf, path);
>> 
>> AvroJob.setInputKeySchema(job, schema);
>> 
>> val rdd = sc.newAPIHadoopFile(
>>path.toString(),
>>classOf[AvroKeyInputFormat[GenericRecord]],
>>classOf[AvroKey[GenericRecord]],
>>classOf[NullWritable], conf).map(x => x._1.datum())
>> val sum = rdd.map(p => 
>> p.get("SEPAL_WIDTH").asInstanceOf[Float]).reduce(_ + _)
>> val avg = sum/rdd.count()
>> println(s"Sum = $sum")
>> println(s"Avg = $avg")
>>   }
>> 
>> If I run this, it works as expected, when I add .cache() to 
>> 
>> val rdd = sc.newAPIHadoopFile(
>>path.toString(),
>>classOf[AvroKeyInputFormat[GenericRecord]],
>>classOf[AvroKey[GenericRecord]],
>>classOf[NullWritable], conf).map(x => x._1.datum()).cache()
>> 
>> then the command rounds up the average.
>> 
>> Any idea why this works this way? Any tips on how to fix this?
>> 
>> Thanks,
>> Ron
> 


Re: rdd.saveAsTextFile blows up

2014-07-24 Thread Eric Friedman
I ported the same code to scala. No problems. But in pyspark, this fails 
consistently:

ctx = SQLContext(sc)
pf = ctx.parquetFile("...")
rdd = pf.map(lambda x: x)
crdd = ctx.inferSchema(rdd)
crdd.saveAsParquetFile("...")

If I do
rdd = sc.parallelize(["hello", "world"])
rdd.saveAsTextFile(...)

It works. 

Ideas?



> On Jul 24, 2014, at 11:05 PM, Akhil Das  wrote:
> 
> Most likely you are closing the connection with HDFS. Can you paste the piece 
> of code that you are executing?
> 
> We were having similar problem when we closed the FileSystem object in our 
> code.
> 
> Thanks
> Best Regards
> 
> 
>> On Thu, Jul 24, 2014 at 11:00 PM, Eric Friedman  
>> wrote:
>> I'm trying to run a simple pipeline using PySpark, version 1.0.1
>> 
>> I've created an RDD over a parquetFile and am mapping the contents with a 
>> transformer function and now wish to write the data out to HDFS.
>> 
>> All of the executors fail with the same stack trace (below)
>> 
>> I do get a directory on HDFS, but it's empty except for a file named 
>> _temporary.
>> 
>> Any ideas?
>> 
>> java.io.IOException (java.io.IOException: Filesystem closed}
>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629)
>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:735)
>> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:793)
>> java.io.DataInputStream.readFully(DataInputStream.java:195)
>> java.io.DataInputStream.readFully(DataInputStream.java:169)
>> parquet.hadoop.ParquetFileReader$Chunk.(ParquetFileReader.java:369)
>> parquet.hadoop.ParquetFileReader$Chunk.(ParquetFileReader.java:362)
>> parquet.hadoop.ParquetFileReader.readColumnChunkPages(ParquetFileReader.java:411)
>> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:349)
>> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
>> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
>> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966)
>> scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:293)
>> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
>> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
>> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
> 


Re: rdd.saveAsTextFile blows up

2014-07-24 Thread Akhil Das
Most likely you are closing the connection with HDFS. Can you paste the
piece of code that you are executing?

We were having similar problem when we closed the FileSystem object in our
code.

Thanks
Best Regards


On Thu, Jul 24, 2014 at 11:00 PM, Eric Friedman 
wrote:

> I'm trying to run a simple pipeline using PySpark, version 1.0.1
>
> I've created an RDD over a parquetFile and am mapping the contents with a
> transformer function and now wish to write the data out to HDFS.
>
> All of the executors fail with the same stack trace (below)
>
> I do get a directory on HDFS, but it's empty except for a file named
> _temporary.
>
> Any ideas?
>
> java.io.IOException (java.io.IOException: Filesystem closed}
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629)
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:735)
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:793)
> java.io.DataInputStream.readFully(DataInputStream.java:195)
> java.io.DataInputStream.readFully(DataInputStream.java:169)
> parquet.hadoop.ParquetFileReader$Chunk.(ParquetFileReader.java:369)
> parquet.hadoop.ParquetFileReader$Chunk.(ParquetFileReader.java:362)
> parquet.hadoop.ParquetFileReader.readColumnChunkPages(ParquetFileReader.java:411)
> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:349)
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966)
> scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:293)
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
>
>


Re: cache changes precision

2014-07-24 Thread Andrew Ash
Hi Ron,

I think you're encountering the issue where cacheing data from Hadoop ends
up with many duplicate values instead of what you expect.  Try adding a
.clone() to the datum() call.

The issue is that Hadoop returns the same object many times but with its
contents changed.  This is an optimization to prevent allocating and GC'ing
an object for every row in Hadoop.  This works fine in Hadoop MapReduce
because it's single-threaded and with no cacheing of the objects.

Spark though saves a reference to each object it gets back from Hadoop.  So
by the end of the partition, Spark ends up with a bunch of references all
to the same object!  I think it's just by chance that this ends up changing
your average to be rounded.

Can you try with cloning the records in the map call?  Also look at the
contents and see if they're actually changed, or if the resulting RDD after
a cache is just the last record "smeared" across all the others.

Cheers,
Andrew


On Thu, Jul 24, 2014 at 2:41 PM, Ron Gonzalez  wrote:

> Hi,
>   I'm doing the following:
>
>   def main(args: Array[String]) = {
> val sparkConf = new
> SparkConf().setAppName("AvroTest").setMaster("local[2]")
> val sc = new SparkContext(sparkConf)
> val conf = new Configuration()
> val job = new Job(conf)
> val path = new Path("/tmp/a.avro");
> val schema = AvroUtils.getSchema(conf, path);
>
> AvroJob.setInputKeySchema(job, schema);
>
> val rdd = sc.newAPIHadoopFile(
>path.toString(),
>classOf[AvroKeyInputFormat[GenericRecord]],
>classOf[AvroKey[GenericRecord]],
>classOf[NullWritable], conf).map(x => x._1.datum())
> val sum = rdd.map(p =>
> p.get("SEPAL_WIDTH").asInstanceOf[Float]).reduce(_ + _)
> val avg = sum/rdd.count()
> println(s"Sum = $sum")
> println(s"Avg = $avg")
>   }
>
> If I run this, it works as expected, when I add .cache() to
>
> val rdd = sc.newAPIHadoopFile(
>path.toString(),
>classOf[AvroKeyInputFormat[GenericRecord]],
>classOf[AvroKey[GenericRecord]],
>classOf[NullWritable], conf).map(x => x._1.datum()).cache()
>
> then the command rounds up the average.
>
> Any idea why this works this way? Any tips on how to fix this?
>
> Thanks,
> Ron
>
>


actor serialization error

2014-07-24 Thread Alan Ngai
Hi,

I’m running into a new problem trying to get streaming going.  I have a test 
class that sets up my pipeline and runs it fine.  The actual production 
implementation sets up the pipeline from within an actor.  At first, I ran into 
a bunch of issues relating to the serialization of closures from within the 
actor, so I externalized the pipeline setup to a separate case class.  The test 
class I mentioned invokes this case class to set up the pipeline.  However, 
when the actor invokes the case class to set up the pipeline, I get the 
following exception when the pipeline is actually run.  Anyone encountered this 
before?


Alan

14/07/24 22:31:10 [ERROR] Executor: Exception in task ID 1
 java.lang.IllegalStateException: Trying to deserialize a serialized ActorRef 
without an ActorSystem in scope. Use 
'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'
at akka.actor.SerializedActorRef.readResolve(ActorRef.scala:412)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1091)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1805)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1704)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1342)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1704)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1342)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:499)
at 
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147)
at 
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1835)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1794)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)

Re: spark streaming actor receiver doesn't play well with kryoserializer

2014-07-24 Thread Alan Ngai
bump.  any ideas?

On Jul 24, 2014, at 3:09 AM, Alan Ngai  wrote:

> it looks like when you configure sparkconfig to use the kryoserializer in 
> combination of using an ActorReceiver, bad things happen.  I modified the 
> ActorWordCount example program from 
> 
> val sparkConf = new SparkConf().setAppName("ActorWordCount")
> 
> to
> 
> val sparkConf = new SparkConf()
>   .setAppName("ActorWordCount")
>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)
> 
> and I get the stack trace below.  I figured it might be that Kryo doesn’t 
> know how to serialize/deserialize the actor so I added a registry.  I also 
> added a default empty constructor to SampleActorReceiver just for kicks
> 
> class SerializationRegistry extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[SampleActorReceiver])
>   }
> }
> 
> …
> 
> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
> extends Actor with ActorHelper {
>   def this() = this(“”)
>   ...
> }
> 
> ...
> val sparkConf = new SparkConf()
>   .setAppName("ActorWordCount")
>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>   .set("spark.kryo.registrator", 
> "org.apache.spark.examples.streaming.SerializationRegistry")
> 
> 
> None of this worked, same stack trace.  Any idea what’s going on?  Is this a 
> known issue and is there a workaround?  
> 
> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while 
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher 
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>  akka.actor.ActorInitializationException: exception during creation
>   at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>   at akka.actor.ActorCell.create(ActorCell.scala:578)
>   at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>   at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>   at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.ConfigurationException: configuration problem while creating 
> [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher 
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>   at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>   at 
> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>   at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>   at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>   at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>   at 
> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.(ActorReceiver.scala:152)
>   at 
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>   at 
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>   at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>   at akka.actor.Props.newActor(Props.scala:339)
>   at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>   at akka.actor.ActorCell.create(ActorCell.scala:560)
>   ... 9 more
> Caused by: java.lang.IllegalArgumentException: constructor public 
> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with 
> arguments [class java.lang.Class, class 
> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>   at akka.util.Reflect$.instantiate(Reflect.scala:69)
>   at akka.actor.Props.cachedActorClass(Props.scala:203)
>   at akka.actor.Props.actorClass(Props.scala:327)
>   at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>   at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>   ... 20 more
> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>   at akka.util.Reflect$.instantiate(Reflect.scala:65)
>   ... 24 more
> 



Re: GraphX Pragel implementation

2014-07-24 Thread Ankur Dave
On Thu, Jul 24, 2014 at 9:52 AM, Arun Kumar  wrote:

> While using pregel  API for Iterations how to figure out which super step
> the iteration currently in.


The Pregel API doesn't currently expose this, but it's very straightforward
to modify Pregel.scala

to do so. Let me know if you'd like help doing this.

Ankur 


Need help, got java.lang.ExceptionInInitializerError in Yarn-Client/Cluster mode

2014-07-24 Thread Jianshi Huang
I can successfully run my code in local mode using spark-submit (--master
local[4]), but I got ExceptionInInitializerError errors in Yarn-client mode.

Any hints what is the problem? Is it a closure serialization problem? How
can I debug it? Your answers would be very helpful.

14/07/25 11:48:14 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ExceptionInInitializerError
java.lang.ExceptionInInitializerError
at
com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal
a:40)
at
com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal
a:36)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1016)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Spark Function setup and cleanup

2014-07-24 Thread Yanbo Liang
You can refer this topic
http://www.mapr.com/developercentral/code/loading-hbase-tables-spark


2014-07-24 22:32 GMT+08:00 Yosi Botzer :

> In my case I want to reach HBase. For every record with userId I want to
> get some extra information about the user and add it to result record for
> further prcessing
>
>
> On Thu, Jul 24, 2014 at 9:11 AM, Yanbo Liang  wrote:
>
>> If you want to connect to DB in program, you can use JdbcRDD (
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
>> )
>>
>>
>> 2014-07-24 18:32 GMT+08:00 Yosi Botzer :
>>
>> Hi,
>>>
>>> I am using the Java api of Spark.
>>>
>>> I wanted to know if there is a way to run some code in a manner that is
>>> like the setup() and cleanup() methods of Hadoop Map/Reduce
>>>
>>> The reason I need it is because I want to read something from the DB
>>> according to each record I scan in my Function, and I would like to open
>>> the DB connection only once (and close it only once).
>>>
>>> Thanks
>>>
>>
>>
>


Re: mapToPair vs flatMapToPair vs flatMap function usage.

2014-07-24 Thread Matei Zaharia
The Pair ones return a JavaPairRDD, which has additional operations on 
key-value pairs. Take a look at 
http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs
 for details.

Matei

On Jul 24, 2014, at 3:41 PM, abhiguruvayya  wrote:

> Can any one help me understand the key difference between mapToPair vs
> flatMapToPair vs flatMap functions and also when to apply these functions in
> particular.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/mapToPair-vs-flatMapToPair-vs-flatMap-function-usage-tp10617.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Simple record matching using Spark SQL

2014-07-24 Thread Yin Huai
Hi Sarath,

Have you tried the current branch 1.0? If not, can you give it a try and
see if the problem can be resolved?

Thanks,

Yin


On Thu, Jul 24, 2014 at 11:17 AM, Yin Huai  wrote:

> Hi Sarath,
>
> I will try to reproduce the problem.
>
> Thanks,
>
> Yin
>
>
> On Wed, Jul 23, 2014 at 11:32 PM, Sarath Chandra <
> sarathchandra.jos...@algofusiontech.com> wrote:
>
>> Hi Michael,
>>
>> Sorry for the delayed response.
>>
>> I'm using Spark 1.0.1 (pre-built version for hadoop 1). I'm running spark
>> programs on a standalone spark cluster using 2 nodes. One node works as
>> both master and worker while other node is just a worker.
>>
>> I quite didn't get when you asked for "jstack of the driver and
>> executor". So I'm attaching the log files generated in $SPARK_HOME/logs and
>> stdout and stderr files for this job in $SPARK_HOME/work folder from both
>> the nodes.
>>
>> Also attaching the program which I executed. If I uncomment the lines 36
>> & 37 the program works fine, otherwise it just keeps running forever.
>>
>> ~Sarath.
>>
>>
>> On Thu, Jul 17, 2014 at 9:35 PM, Michael Armbrust > > wrote:
>>
>>> What version are you running?  Could you provide a jstack of the driver
>>> and executor when it is hanging?
>>>
>>>
>>> On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra <
>>> sarathchandra.jos...@algofusiontech.com> wrote:
>>>
 Added below 2 lines just before the sql query line -
 *...*
 *file1_schema.count;*
 *file2_schema.count;*
 *...*
 and it started working. But I couldn't get the reason.

 Can someone please explain me? What was happening earlier and what is
 happening with addition of these 2 lines?

 ~Sarath


 On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra <
 sarathchandra.jos...@algofusiontech.com> wrote:

> No Sonal, I'm not doing any explicit call to stop context.
>
> If you see my previous post to Michael, the commented portion of the
> code is my requirement. When I run this over standalone spark cluster, the
> execution keeps running with no output or error. After waiting for several
> minutes I'm killing it by pressing Ctrl+C in the terminal.
>
> But the same code runs perfectly when executed from spark shell.
>
> ~Sarath
>
>
> On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal 
> wrote:
>
>> Hi Sarath,
>>
>> Are you explicitly stopping the context?
>>
>> sc.stop()
>>
>>
>>
>>
>> Best Regards,
>> Sonal
>> Nube Technologies 
>>
>> 
>>
>>
>>
>>
>> On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra <
>> sarathchandra.jos...@algofusiontech.com> wrote:
>>
>>> Hi Michael, Soumya,
>>>
>>> Can you please check and let me know what is the issue? what am I
>>> missing?
>>> Let me know if you need any logs to analyze.
>>>
>>> ~Sarath
>>>
>>>
>>> On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra <
>>> sarathchandra.jos...@algofusiontech.com> wrote:
>>>
 Hi Michael,

 Tried it. It's correctly printing the line counts of both the
 files. Here's what I tried -

 *Code:*
 *package test*
 *object Test4 {*
 *  case class Test(fld1: String, *
 *   fld2: String, *
 *   fld3: String, *
 *   fld4: String, *
 *   fld5: String, *
 *   fld6: Double, *
 *   fld7: String);*
 *  def main(args: Array[String]) {*
 *val conf = new SparkConf()*
 *.setMaster(args(0))*
 * .setAppName("SQLTest")*
 * .setSparkHome(args(1))*
 * .set("spark.executor.memory", "2g");*
 *val sc = new SparkContext(conf);*
 *sc.addJar("test1-0.1.jar");*
 *val file1 = sc.textFile(args(2));*
 *println(file1.count());*
 *val file2 = sc.textFile(args(3));*
 *println(file2.count());*
 *//val sq = new SQLContext(sc);*
 *//import sq._*
 *//val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
 Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
 *//val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
 Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
 *//val file1_schema = sq.createSchemaRDD(file1_recs);*
 *//val file2_schema = sq.createSchemaRDD(file2_recs);*
 *//file1_schema.registerAsTable("file1_tab");*
 *//file2_schema.registerAsTable("file2_tab");*
 *//val matched = sq.sql("select * from file1_tab l join
 file2_tab s on " + *
 *// "l.fld7=s.fld7 where l.fld2=s.fld2 and " + *
 *// "l.fld3=s.fld3 and l.fld4=s.fld4 and " + *
 *// "l.fld6=s.fld6");*
 *//matched.collect().foreach(println);*
>>>

Re: NotSerializableException in Spark Streaming

2014-07-24 Thread Nicholas Chammas
Yep, here goes!

Here are my environment vitals:

   - Spark 1.0.0
   - EC2 cluster with 1 slave spun up using spark-ec2
   - twitter4j 3.0.3
   - spark-shell called with --jars argument to load
   spark-streaming-twitter_2.10-1.0.0.jar as well as all the twitter4j
   jars.

Now, while I’m in the Spark shell, I enter the following:

import twitter4j.auth.{Authorization, OAuthAuthorization}
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
def getAuth(): Option[Authorization] = {

  System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
  System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
  System.setProperty("twitter4j.oauth.accessToken", "accessToken")
  System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")

  Some(new OAuthAuthorization(new ConfigurationBuilder().build()))

}
def noop(a: Any): Any = {
  a
}
val ssc = new StreamingContext(sc, Seconds(5))
val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
val liveTweets = liveTweetObjects.map(_.getText)

liveTweets.map(t => noop(t)).print()

ssc.start()

So basically, I’m just printing Tweets as-is, but first I’m mapping them to
themselves via noop(). The Tweets will start to flow just fine for a minute
or so, and then, this:

14/07/24 23:13:30 ERROR JobScheduler: Error running job streaming job
140624361 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure:
Task not serializable: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
at 
[org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at 
[org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
at 
[org.apache.spark.scheduler.DAGScheduler.org](http://org.apache.spark.scheduler.DAGScheduler.org)$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The time-to-first-error is variable.

This is the simplest repro I can show at this time. Doing more complex
things with liveTweets that involve a KMeansModel, for example, will be
interrupted quicker by this java.io.NotSerializableException. I don’t know
if the root cause is the same, but the error certainly is.

By the way, trying to reproduce this on 1.0.1 doesn’t raise the same error,
but I can’t dig deeper to make sure this is really resolved (e.g. by trying
more complex things that need data) due to SPARK-2471
. I see that that issue
has been resolved, so I’ll try this whole process again using the latest
from master and see how it goes.

Nick


On Tue, Jul 15, 2014 at 5:58 PM, Tathagata Das 
wrote:

I am very curious though. Can you post a concise code example which we can
> run to reproduce this problem?
>
> TD
>
>
> On Tue, Jul 15, 2014 at 2:54 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> I am not entire sure off the top of my head. But a possible (usually
>> works) workaround is to define the function as a val instead of a def. For
>> example
>>
>> def func(i: Int): Boolean = { true }
>>
>> can be written as
>>
>> val func = (i: Int) => { true }
>>
>> Hope this helps for now.
>>
>> TD
>>
>>
>> On Tue, Jul 15, 2014 at 9:21 AM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> Hey Diana,
>>>
>>> Did you ever figure this out?
>>>
>>> I’m running into the 

Re: streaming sequence files?

2014-07-24 Thread Barnaby
I have the streaming program writing sequence files. I can find one of the
files and load it in the shell using:

scala> val rdd = sc.sequenceFile[String,
Int]("tachyon://localhost:19998/files/WordCounts/20140724-213930")
14/07/24 21:47:50 INFO storage.MemoryStore: ensureFreeSpace(32856) called
with curMem=0, maxMem=309225062
14/07/24 21:47:50 INFO storage.MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 32.1 KB, free 294.9 MB)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1] at sequenceFile
at :12

So I got some type information, seems good.

It took a while to research but I got the following streaming code to
compile and run:

val wordCounts = ssc.fileStream[String, Int, SequenceFileInputFormat[String,
Int]](args(0))

It works now and I offer this for reference to anybody else who may be
curious about saving sequence files and then streaming them back in.

Question:
When running both streaming programs at the same time using spark-submit I
noticed that only one app would really run. To get the one app to continue I
had to stop the other app. Is there a way to get these running
simultaneously?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-sequence-files-tp10557p10620.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Getting the number of slaves

2014-07-24 Thread Nicolas Mai
Thanks, this is what I needed :) I should have searched more...

Something I noticed though: after the SparkContext is initialized, I had to
wait for a few seconds until sc.getExecutorStorageStatus.length returns the
correct number of workers in my cluster (otherwise it returns 1, for the
driver)...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p10619.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-07-24 Thread Tathagata Das
You can set the Java option "-Dsun.io.serialization.extendedDebugInfo=true" to
have more information about the object be printed. It will help you trace
down the how the SparkContext is getting included in some kind of closure.

TD


On Thu, Jul 24, 2014 at 9:48 AM, lihu  wrote:

> ​Which code do you used, do you caused by your own code or something in
> spark itself?
>
>
> On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com 
> wrote:
>
>> I have the same problem
>>
>>
>> On Sat, Jul 19, 2014 at 12:31 AM, lihu  wrote:
>>
>>> Hi,
>>> Everyone.  I have a piece of following code. When I run it,
>>> it occurred the error just like below, it seem that the SparkContext is not
>>> serializable, but i do not try to use the SparkContext except the broadcast.
>>> [In fact, this code is in the MLLib, I just try to broadcast the
>>>  centerArrays ]
>>>
>>> it can success in the redeceBykey operation, but failed at the
>>> collect operation, this confused me.
>>>
>>>
>>> INFO DAGScheduler: Failed to run collect at KMeans.scala:235
>>> [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
>>> not serializable: java.io.NotSerializableException:
>>> org.apache.spark.SparkContext
>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>> java.io.NotSerializableException: org.apache.spark.SparkContext
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>>  at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>>
>>>
>>>
>>>
>>> private def initKMeansParallel(data: RDD[Array[Double]]):
>>> Array[ClusterCenters] = {
>>>
>>> @transient val sc = data.sparkContext   // I try to add the 
>>> transient
>>> annotation here, but it doesn't work
>>>
>>> // Initialize each run's center to a random point
>>> val seed = new XORShiftRandom().nextInt()
>>> val sample = data.takeSample(true, runs, seed).toSeq
>>> val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
>>>
>>> // On each step, sample 2 * k points on average for each run with
>>> probability proportional
>>> // to their squared distance from that run's current centers
>>> for (step <- 0 until initializationSteps) {
>>>   val centerArrays = sc.broadcast(centers.map(_.toArray))
>>>   val sumCosts = data.flatMap { point =>
>>> for (r <- 0 until runs) yield (r,
>>> KMeans.pointCost(centerArrays.value(r), point))
>>>   }.reduceByKey(_ + _).collectAsMap()
>>> //can pass at this point
>>>   val chosen = data.mapPartitionsWithIndex { (index, points) =>
>>> val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
>>> for {
>>>   p <- points
>>>   r <- 0 until runs
>>>   if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r),
>>> p) * 2 * k / sumCosts(r)
>>> } yield (r, p)
>>>   }.collect()
>>> // failed at this
>>> point.
>>>   for ((r, p) <- chosen) {
>>> centers(r) += p
>>>   }
>>> }
>>>
>>>
>>>
>>>
>>>
>>
>
>
> --
> *Best Wishes!*
>
>  *Li Hu(李浒) | Graduate Student*
>
> *Institute for Interdisciplinary Information Sciences(IIIS
> ) *
> *Tsinghua University, China*
>
> *Email: lihu...@gmail.com *
> *Tel  : +86 15120081920 <%2B86%2015120081920>*
> *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
> *
>
>
>


mapToPair vs flatMapToPair vs flatMap function usage.

2014-07-24 Thread abhiguruvayya
Can any one help me understand the key difference between mapToPair vs
flatMapToPair vs flatMap functions and also when to apply these functions in
particular.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mapToPair-vs-flatMapToPair-vs-flatMap-function-usage-tp10617.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


cache changes precision

2014-07-24 Thread Ron Gonzalez
Hi,
  I'm doing the following:

  def main(args: Array[String]) = {
    val sparkConf = new SparkConf().setAppName("AvroTest").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val conf = new Configuration()
    val job = new Job(conf)
    val path = new Path("/tmp/a.avro");
    val schema = AvroUtils.getSchema(conf, path);

    AvroJob.setInputKeySchema(job, schema);
    
    val rdd = sc.newAPIHadoopFile(
       path.toString(),
       classOf[AvroKeyInputFormat[GenericRecord]],
       classOf[AvroKey[GenericRecord]],
       classOf[NullWritable], conf).map(x => x._1.datum())
    val sum = rdd.map(p => p.get("SEPAL_WIDTH").asInstanceOf[Float]).reduce(_ + 
_)
    val avg = sum/rdd.count()
    println(s"Sum = $sum")
    println(s"Avg = $avg")
  }

If I run this, it works as expected, when I add .cache() to 

val rdd = sc.newAPIHadoopFile(
       path.toString(),
       classOf[AvroKeyInputFormat[GenericRecord]],
       classOf[AvroKey[GenericRecord]],
       classOf[NullWritable], conf).map(x => x._1.datum()).cache()

then the command rounds up the average.

Any idea why this works this way? Any tips on how to fix this?

Thanks,
Ron


KMeans: expensiveness of large vectors

2014-07-24 Thread durin
As a source, I have a textfile with n rows that each contain m
comma-separated integers. 
Each row is then converted into a feature vector with m features each.

I've noticed, that given the same total filesize and number of features, a
larger number of columns is much more expensive for training a KMeans model
than a large number of rows.

To give an example:
10k rows X 1k columns took 21 seconds on my cluster, whereas 1k rows X 10k
colums took 1min47s. Both files had a size of 238M. 

Can someone explain what in the implementation of KMeans causes large
vectors to be so much more expensive than having many of these vectors?
A pointer to the exact part of the source would be fantastic, but even a
general explanation would help me.


Best regards,
Simon 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: continuing processing when errors occur

2014-07-24 Thread Imran Rashid
whoops!  just realized I was retyring the function even on success.  didn't
pay enough attention to the output from my calls.  Slightly updated
definitions:

class RetryFunction[-A](nTries: Int,f: A => Unit) extends Function[A,Unit]
{
  def apply(a: A): Unit = {
var tries = 0
var success = false
while(!success && tries < nTries) {
  tries += 1
  try {
f(a)
success = true
  } catch {
case scala.util.control.NonFatal(ex) =>
  println(s"failed on input $a, try $tries with $ex")
  }
}
  }
}

implicit class Retryable[A](f: A => Unit) {
  def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
}


def tenDiv(x:Int) = println(x + " ---> " + (10 / x))


and example usage:

scala> (-2 to 2).foreach{(tenDiv _).retryable(3)}
-2 ---> -5
-1 ---> -10
failed on input 0, try 1 with java.lang.ArithmeticException: / by zero
failed on input 0, try 2 with java.lang.ArithmeticException: / by zero
failed on input 0, try 3 with java.lang.ArithmeticException: / by zero
1 ---> 10
2 ---> 5





On Thu, Jul 24, 2014 at 2:58 PM, Imran Rashid  wrote:

> Hi Art,
>
> I have some advice that isn't spark-specific at all, so it doesn't
> *exactly* address your questions, but you might still find helpful.  I
> think using an implicit to add your retyring behavior might be useful.  I
> can think of two options:
>
> 1. enriching RDD itself, eg. to add a .retryForeach, which would have the
> desired behavior.
>
> 2. enriching Function to create a variant with retry behavior.
>
> I prefer option 2, because it could be useful outside of spark, and even
> within spark, you might realize you want to do something similar for more
> than just foreach.
>
> Here's an example.  (probably there is a more functional way to do this,
> to avoid the while loop, but my brain isn't working and that's not the
> point of this anyway)
>
> Lets say we have this function:
>
> def tenDiv(x:Int) = println(10 / x)
>
> and we try applying it to a normal old Range:
>
> scala> (-10 to 10).foreach{tenDiv}
> -1
> -1
> -1
> -1
> -1
> -2
> -2
> -3
> -5
> -10
> java.lang.ArithmeticException: / by zero
> at .tenDiv(:7)
>
>
> We can create enrich Function to add some retry behavior:
>
> class RetryFunction[-A](nTries: Int,f: A => Unit) extends Function[A,Unit]
> {
>   def apply(a: A): Unit = {
> var tries = 0
> var success = false
> while(!success && tries < nTries) {
>   tries += 1
>   try {
> f(a)
>   } catch {
> case scala.util.control.NonFatal(ex) =>
>   println(s"failed on try $tries with $ex")
>   }
> }
>   }
> }
>
> implicit class Retryable[A](f: A => Unit) {
>   def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
> }
>
>
>
> We "activate" this behavior by calling .retryable(nTries) on our method.
> Like so:
>
> scala> (-2 to 2).foreach{(tenDiv _).retryable(1)}
> -5
> -10
> failed on try 1 with java.lang.ArithmeticException: / by zero
> 10
> 5
>
> scala> (-2 to 2).foreach{(tenDiv _).retryable(3)}
> -5
> -5
> -5
> -10
> -10
> -10
> failed on try 1 with java.lang.ArithmeticException: / by zero
> failed on try 2 with java.lang.ArithmeticException: / by zero
> failed on try 3 with java.lang.ArithmeticException: / by zero
> 10
> 10
> 10
> 5
> 5
> 5
>
>
> You could do the same thing on closures you pass to RDD.foreach.
>
> I should add, that I'm often very hesitant to use implicits because in can
> make it harder to follow what's going on in the code.  I think this version
> is OK, though, b/c somebody coming along later and looking at the code at
> least can see the call to "retryable" as a clue.  (I really dislike
> implicit conversions that happen without any hints in the actual code.)
> Hopefully that's enough of a hint for others to figure out what is going
> on.  Eg., intellij will know where that method came from and jump to it,
> and also if you make the name unique enough, you can probably find it with
> plain text search / c-tags.  But, its definitely worth considering for
> yourself.
>
> hope this helps,
> Imran
>
>
>
> On Thu, Jul 24, 2014 at 1:12 PM, Art Peel  wrote:
>
>> Our system works with RDDs generated from Hadoop files. It processes each
>> record in a Hadoop file and for a subset of those records generates output
>> that is written to an external system via RDD.foreach. There are no
>> dependencies between the records that are processed.
>>
>> If writing to the external system fails (due to a detail of what is being
>> written) and throws an exception, I see the following behavior:
>>
>> 1. Spark retries the entire partition (thus wasting time and effort),
>> reaches the problem record and fails again.
>> 2. It repeats step 1 up to the default 4 tries and then gives up. As a
>> result, the rest of records from that Hadoop file are not processed.
>> 3. The executor where the 4th failure occurred is marked as failed and
>> told to shut down and thus I lose a core for processing the remaining
>> H

Re: Configuring Spark Memory

2014-07-24 Thread John Omernik
SO this is good information for standalone, but how is memory distributed
within Mesos?  There's coarse grain mode where the execute stays active, or
theres fine grained mode where it appears each task is it's only process in
mesos, how to memory allocations work in these cases? Thanks!



On Thu, Jul 24, 2014 at 12:14 PM, Martin Goodson 
wrote:

> Great - thanks for the clarification Aaron. The offer stands for me to
> write some documentation and an example that covers this without leaving
> *any* room for ambiguity.
>
>
>
>
> --
> Martin Goodson  |  VP Data Science
> (0)20 3397 1240
> [image: Inline image 1]
>
>
> On Thu, Jul 24, 2014 at 6:09 PM, Aaron Davidson 
> wrote:
>
>> Whoops, I was mistaken in my original post last year. By default, there
>> is one executor per node per Spark Context, as you said.
>> "spark.executor.memory" is the amount of memory that the application
>> requests for each of its executors. SPARK_WORKER_MEMORY is the amount of
>> memory a Spark Worker is willing to allocate in executors.
>>
>> So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your
>> cluster, and spark.executor.memory to 4g, you would be able to run 2
>> simultaneous Spark Contexts who get 4g per node. Similarly, if
>> spark.executor.memory were 8g, you could only run 1 Spark Context at a time
>> on the cluster, but it would get all the cluster's memory.
>>
>>
>> On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson 
>> wrote:
>>
>>> Thank you Nishkam,
>>> I have read your code. So, for the sake of my understanding, it seems
>>> that for each spark context there is one executor per node? Can anyone
>>> confirm this?
>>>
>>>
>>> --
>>> Martin Goodson  |  VP Data Science
>>> (0)20 3397 1240
>>> [image: Inline image 1]
>>>
>>>
>>> On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi 
>>> wrote:
>>>
 See if this helps:

 https://github.com/nishkamravi2/SparkAutoConfig/

 It's a very simple tool for auto-configuring default parameters in
 Spark. Takes as input high-level parameters (like number of nodes, cores
 per node, memory per node, etc) and spits out default configuration, user
 advice and command line. Compile (javac SparkConfigure.java) and run (java
 SparkConfigure).

 Also cc'ing dev in case others are interested in helping evolve this
 over time (by refining the heuristics and adding more parameters).


  On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson 
 wrote:

> Thanks Andrew,
>
> So if there is only one SparkContext there is only one executor per
> machine? This seems to contradict Aaron's message from the link above:
>
> "If each machine has 16 GB of RAM and 4 cores, for example, you might
> set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by
> Spark.)"
>
> Am I reading this incorrectly?
>
> Anyway our configuration is 21 machines (one master and 20 slaves)
> each with 60Gb. We would like to use 4 cores per machine. This is pyspark
> so we want to leave say 16Gb on each machine for python processes.
>
> Thanks again for the advice!
>
>
>
> --
> Martin Goodson  |  VP Data Science
> (0)20 3397 1240
> [image: Inline image 1]
>
>
> On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash 
> wrote:
>
>> Hi Martin,
>>
>> In standalone mode, each SparkContext you initialize gets its own set
>> of executors across the cluster.  So for example if you have two shells
>> open, they'll each get two JVMs on each worker machine in the cluster.
>>
>> As far as the other docs, you can configure the total number of cores
>> requested for the SparkContext, the amount of memory for the executor JVM
>> on each machine, the amount of memory for the Master/Worker daemons 
>> (little
>> needed since work is done in executors), and several other settings.
>>
>> Which of those are you interested in?  What spec hardware do you have
>> and how do you want to configure it?
>>
>> Andrew
>>
>>
>> On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson > > wrote:
>>
>>> We are having difficulties configuring Spark, partly because we
>>> still don't understand some key concepts. For instance, how many 
>>> executors
>>> are there per machine in standalone mode? This is after having
>>> closely read the documentation several times:
>>>
>>> *http://spark.apache.org/docs/latest/configuration.html
>>> *
>>> *http://spark.apache.org/docs/latest/spark-standalone.html
>>> *
>>> *http://spark.apache.org/docs/latest/tuning.html
>>> *
>>> *http://spark.apache.org/docs/latest/cluster-overview.html
>>> *
>>>

Re: continuing processing when errors occur

2014-07-24 Thread Imran Rashid
Hi Art,

I have some advice that isn't spark-specific at all, so it doesn't
*exactly* address your questions, but you might still find helpful.  I
think using an implicit to add your retyring behavior might be useful.  I
can think of two options:

1. enriching RDD itself, eg. to add a .retryForeach, which would have the
desired behavior.

2. enriching Function to create a variant with retry behavior.

I prefer option 2, because it could be useful outside of spark, and even
within spark, you might realize you want to do something similar for more
than just foreach.

Here's an example.  (probably there is a more functional way to do this, to
avoid the while loop, but my brain isn't working and that's not the point
of this anyway)

Lets say we have this function:

def tenDiv(x:Int) = println(10 / x)

and we try applying it to a normal old Range:

scala> (-10 to 10).foreach{tenDiv}
-1
-1
-1
-1
-1
-2
-2
-3
-5
-10
java.lang.ArithmeticException: / by zero
at .tenDiv(:7)


We can create enrich Function to add some retry behavior:

class RetryFunction[-A](nTries: Int,f: A => Unit) extends Function[A,Unit]
{
  def apply(a: A): Unit = {
var tries = 0
var success = false
while(!success && tries < nTries) {
  tries += 1
  try {
f(a)
  } catch {
case scala.util.control.NonFatal(ex) =>
  println(s"failed on try $tries with $ex")
  }
}
  }
}

implicit class Retryable[A](f: A => Unit) {
  def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
}



We "activate" this behavior by calling .retryable(nTries) on our method.
Like so:

scala> (-2 to 2).foreach{(tenDiv _).retryable(1)}
-5
-10
failed on try 1 with java.lang.ArithmeticException: / by zero
10
5

scala> (-2 to 2).foreach{(tenDiv _).retryable(3)}
-5
-5
-5
-10
-10
-10
failed on try 1 with java.lang.ArithmeticException: / by zero
failed on try 2 with java.lang.ArithmeticException: / by zero
failed on try 3 with java.lang.ArithmeticException: / by zero
10
10
10
5
5
5


You could do the same thing on closures you pass to RDD.foreach.

I should add, that I'm often very hesitant to use implicits because in can
make it harder to follow what's going on in the code.  I think this version
is OK, though, b/c somebody coming along later and looking at the code at
least can see the call to "retryable" as a clue.  (I really dislike
implicit conversions that happen without any hints in the actual code.)
Hopefully that's enough of a hint for others to figure out what is going
on.  Eg., intellij will know where that method came from and jump to it,
and also if you make the name unique enough, you can probably find it with
plain text search / c-tags.  But, its definitely worth considering for
yourself.

hope this helps,
Imran



On Thu, Jul 24, 2014 at 1:12 PM, Art Peel  wrote:

> Our system works with RDDs generated from Hadoop files. It processes each
> record in a Hadoop file and for a subset of those records generates output
> that is written to an external system via RDD.foreach. There are no
> dependencies between the records that are processed.
>
> If writing to the external system fails (due to a detail of what is being
> written) and throws an exception, I see the following behavior:
>
> 1. Spark retries the entire partition (thus wasting time and effort),
> reaches the problem record and fails again.
> 2. It repeats step 1 up to the default 4 tries and then gives up. As a
> result, the rest of records from that Hadoop file are not processed.
> 3. The executor where the 4th failure occurred is marked as failed and
> told to shut down and thus I lose a core for processing the remaining
> Hadoop files, thus slowing down the entire process.
>
>
> For this particular problem, I know how to prevent the underlying
> exception, but I'd still like to get a handle on error handling for future
> situations that I haven't yet encountered.
>
> My goal is this:
> Retry the problem record only (rather than starting over at the beginning
> of the partition) up to N times, then give up and move on to process the
> rest of the partition.
>
> As far as I can tell, I need to supply my own retry behavior and if I want
> to process records after the problem record I have to swallow exceptions
> inside the foreach block.
>
> My 2 questions are:
> 1. Is there anything I can do to prevent the executor from being shut down
> when a failure occurs?
>
>
> 2. Are there ways Spark can help me get closer to my goal of retrying only
> the problem record without writing my own re-try code and swallowing
> exceptions?
>
> Regards,
> Art
>
>


Kmeans: set initial centers explicitly

2014-07-24 Thread SK

Hi,

The mllib.clustering.kmeans implementation supports a random or parallel
initialization mode to pick the initial centers. is there a way to specify
the initial centers explictly? It would be useful to have a setCenters()
method where we can explicitly specify the initial centers. (For e.g. R
allows us to specify the initial centers.)  

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-set-initial-centers-explicitly-tp10609.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Emacs Setup Anyone?

2014-07-24 Thread Steve Nunez
Anyone out there have a good configuration for emacs? Scala-mode sort of
works, but I¹d love to see a fully-supported spark-mode with an inferior
shell. Searching didn¹t turn up much of anything.

Any emacs users out there? What setup are you using?

Cheers,
- SteveN





-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: Getting the number of slaves

2014-07-24 Thread Evan R. Sparks
Try sc.getExecutorStorageStatus().length

SparkContext's getExecutorMemoryStatus or getExecutorStorageStatus will
give you back an object per executor - the StorageStatus objects are what
drives a lot of the Spark Web UI.

https://spark.apache.org/docs/1.0.1/api/scala/index.html#org.apache.spark.SparkContext


On Thu, Jul 24, 2014 at 11:16 AM, Nicolas Mai  wrote:

> Hi,
>
> Is there a way to get the number of slaves/workers during runtime?
>
> I searched online but didn't find anything :/ The application I'm working
> will run on different clusters corresponding to different deployment stages
> (beta -> prod). It would be great to get the number of slaves currently in
> use, in order set the level of parallelism and RDD partitions, based on
> that
> number.
>
> Thanks!
> Nicolas
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Spark Training at Scala By the Bay with Databricks, Fast Tracl to Scala

2014-07-24 Thread Alexy Khrabrov
Scala By the Bay (www.scalabythebay.org) is happy to confirm that our
Spark training on August 11-12 will be run by Databricks and By the
Bay together.  It will be focused on Scala, and is the first Spark
Training at a major Scala venue.  Spark is written in Scala, with the
unified data pipeline and interactive REPL and streaming all taking
advantage of powerful Scala DSL capabilities.  Type safety ensures
that your program will not have run-time errors due to typos, and JVM
performance wins for machine learning and compute-intensive
applications.  Type inference keeps code clear and concise in Scala
while keeping all strong typing guarantees, and immutability enables
concurrency on multicores as well as across the cluster.  If you're
serious about performance and enterprise maintainability, you should
use Spark with Scala.

We're offering Typesafe-certified Fast Track to Scala (FTTS) course at
Scala By the Bay.  To celebrate our becoming a Typesafe Certified
Training Partner, we're happy to extend to Spark users a $300 discount
for the FTTS -- use code SPARKSCALABYTHEBAY.  FTTS is taught on August
6-7 by Brendan W. McAdams.  Brendan is an Akka contributor and the
author of MongoDB Scala drivers, including reactive ones, and one of
the best Scala teachers in the world.  His notes on the course are
here: http://www.scalabythebay.org/files/scala_training_deck.pdf.

This is the best intensive ramp-up Scala course you could possibly
take.  If your company is choosing Scala, there's no better way to get
up to speed on the whole Scala ecosystem by attending the training and
then the conference (and possibly Spark training afterwards).  As you
dive deeper into Spark, you will want to use it in Scala.

Our trainings are fairly intimate affairs, limited to 25 people each.
It means you will get individual attention for your skill level.
Please share this course information with your colleagues who are
planning to learn Scala and Spark.

Full training info: http://www.scalabythebay.org/training.html#training.
Group discounts are available on 5 or more registrations,
contactregistrat...@scalabythebay.org for more information about those
or if you have any other questions.

Cheers,
A+ (Alexy) and Scala By the Bay


Re: Simple record matching using Spark SQL

2014-07-24 Thread Yin Huai
Hi Sarath,

I will try to reproduce the problem.

Thanks,

Yin


On Wed, Jul 23, 2014 at 11:32 PM, Sarath Chandra <
sarathchandra.jos...@algofusiontech.com> wrote:

> Hi Michael,
>
> Sorry for the delayed response.
>
> I'm using Spark 1.0.1 (pre-built version for hadoop 1). I'm running spark
> programs on a standalone spark cluster using 2 nodes. One node works as
> both master and worker while other node is just a worker.
>
> I quite didn't get when you asked for "jstack of the driver and executor".
> So I'm attaching the log files generated in $SPARK_HOME/logs and stdout and
> stderr files for this job in $SPARK_HOME/work folder from both the nodes.
>
> Also attaching the program which I executed. If I uncomment the lines 36 &
> 37 the program works fine, otherwise it just keeps running forever.
>
> ~Sarath.
>
>
> On Thu, Jul 17, 2014 at 9:35 PM, Michael Armbrust 
> wrote:
>
>> What version are you running?  Could you provide a jstack of the driver
>> and executor when it is hanging?
>>
>>
>> On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra <
>> sarathchandra.jos...@algofusiontech.com> wrote:
>>
>>> Added below 2 lines just before the sql query line -
>>> *...*
>>> *file1_schema.count;*
>>> *file2_schema.count;*
>>> *...*
>>> and it started working. But I couldn't get the reason.
>>>
>>> Can someone please explain me? What was happening earlier and what is
>>> happening with addition of these 2 lines?
>>>
>>> ~Sarath
>>>
>>>
>>> On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra <
>>> sarathchandra.jos...@algofusiontech.com> wrote:
>>>
 No Sonal, I'm not doing any explicit call to stop context.

 If you see my previous post to Michael, the commented portion of the
 code is my requirement. When I run this over standalone spark cluster, the
 execution keeps running with no output or error. After waiting for several
 minutes I'm killing it by pressing Ctrl+C in the terminal.

 But the same code runs perfectly when executed from spark shell.

 ~Sarath


 On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal 
 wrote:

> Hi Sarath,
>
> Are you explicitly stopping the context?
>
> sc.stop()
>
>
>
>
> Best Regards,
> Sonal
> Nube Technologies 
>
> 
>
>
>
>
> On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra <
> sarathchandra.jos...@algofusiontech.com> wrote:
>
>> Hi Michael, Soumya,
>>
>> Can you please check and let me know what is the issue? what am I
>> missing?
>> Let me know if you need any logs to analyze.
>>
>> ~Sarath
>>
>>
>> On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra <
>> sarathchandra.jos...@algofusiontech.com> wrote:
>>
>>> Hi Michael,
>>>
>>> Tried it. It's correctly printing the line counts of both the files.
>>> Here's what I tried -
>>>
>>> *Code:*
>>> *package test*
>>> *object Test4 {*
>>> *  case class Test(fld1: String, *
>>> *   fld2: String, *
>>> *   fld3: String, *
>>> *   fld4: String, *
>>> *   fld5: String, *
>>> *   fld6: Double, *
>>> *   fld7: String);*
>>> *  def main(args: Array[String]) {*
>>> *val conf = new SparkConf()*
>>> *.setMaster(args(0))*
>>> * .setAppName("SQLTest")*
>>> * .setSparkHome(args(1))*
>>> * .set("spark.executor.memory", "2g");*
>>> *val sc = new SparkContext(conf);*
>>> *sc.addJar("test1-0.1.jar");*
>>> *val file1 = sc.textFile(args(2));*
>>> *println(file1.count());*
>>> *val file2 = sc.textFile(args(3));*
>>> *println(file2.count());*
>>> *//val sq = new SQLContext(sc);*
>>> *//import sq._*
>>> *//val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>> *//val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>> *//val file1_schema = sq.createSchemaRDD(file1_recs);*
>>> *//val file2_schema = sq.createSchemaRDD(file2_recs);*
>>> *//file1_schema.registerAsTable("file1_tab");*
>>> *//file2_schema.registerAsTable("file2_tab");*
>>> *//val matched = sq.sql("select * from file1_tab l join
>>> file2_tab s on " + *
>>> *// "l.fld7=s.fld7 where l.fld2=s.fld2 and " + *
>>> *// "l.fld3=s.fld3 and l.fld4=s.fld4 and " + *
>>> *// "l.fld6=s.fld6");*
>>> *//matched.collect().foreach(println);*
>>> *  }*
>>> *}*
>>>
>>> *Execution:*
>>> *export
>>> CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
>>> *export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"*
>>> *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
>>> "/usr/local/spark-1.0.1-bin-hadoop1"
>>> hdfs://master:54310/user/h

Getting the number of slaves

2014-07-24 Thread Nicolas Mai
Hi,

Is there a way to get the number of slaves/workers during runtime?

I searched online but didn't find anything :/ The application I'm working
will run on different clusters corresponding to different deployment stages
(beta -> prod). It would be great to get the number of slaves currently in
use, in order set the level of parallelism and RDD partitions, based on that
number.

Thanks!
Nicolas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


continuing processing when errors occur

2014-07-24 Thread Art Peel
Our system works with RDDs generated from Hadoop files. It processes each
record in a Hadoop file and for a subset of those records generates output
that is written to an external system via RDD.foreach. There are no
dependencies between the records that are processed.

If writing to the external system fails (due to a detail of what is being
written) and throws an exception, I see the following behavior:

1. Spark retries the entire partition (thus wasting time and effort),
reaches the problem record and fails again.
2. It repeats step 1 up to the default 4 tries and then gives up. As a
result, the rest of records from that Hadoop file are not processed.
3. The executor where the 4th failure occurred is marked as failed and told
to shut down and thus I lose a core for processing the remaining Hadoop
files, thus slowing down the entire process.

For this particular problem, I know how to prevent the underlying
exception, but I'd still like to get a handle on error handling for future
situations that I haven't yet encountered.

My goal is this:
Retry the problem record only (rather than starting over at the beginning
of the partition) up to N times, then give up and move on to process the
rest of the partition.

As far as I can tell, I need to supply my own retry behavior and if I want
to process records after the problem record I have to swallow exceptions
inside the foreach block.

My 2 questions are:
1. Is there anything I can do to prevent the executor from being shut down
when a failure occurs?

2. Are there ways Spark can help me get closer to my goal of retrying only
the problem record without writing my own re-try code and swallowing
exceptions?

Regards,
Art


GraphX canonical conflation issues

2014-07-24 Thread e5c
Hi there,

This issue has been mentioned in:

http://apache-spark-user-list.1001560.n3.nabble.com/Java-IO-Stream-Corrupted-Invalid-Type-AC-td6925.html
 

However I'm starting a new thread since the issue is distinct from the above
topic's designated subject.

I'm test-running canonical conflation on a ~100 MB graph (with hopes to
scale to 10 GB or more). I'm deploying on 5 r3.xlarge machines on AWS EMR
and using default configurations, with the exception of setting
spark.serializer as org.apache.spark.serializer.KryoSerializer.

The full stack-trace from canonical conflation is pasted below; it evidently
fails at: "Failed to run reduce at VertexRDD.scala:100". (The same app ran
just fine on very small input locally.) Has there been any progress in
identifying the underlying issues? Thanks!

14/07/24 16:29:37 INFO mapred.FileInputFormat: Total input paths to process
: 1
* About to run connected components *
14/07/24 16:29:37 INFO spark.SparkContext: Starting job: reduce at
VertexRDD.scala:100
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 5
(mapPartitions at VertexRDD.scala:423)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 18
(mapPartitions at VertexRDD.scala:318)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 22
(mapPartitions at VertexRDD.scala:318)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Registering RDD 26
(mapPartitions at GraphImpl.scala:184)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Got job 0 (reduce at
VertexRDD.scala:100) with 1 output partitions (allowLocal=false)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Final stage: Stage 0(reduce
at VertexRDD.scala:100)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Parents of final stage:
List(Stage 1, Stage 2)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Missing parents: List(Stage
1, Stage 2)
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Submitting Stage 1
(VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[5]
at mapPartitions at VertexRDD.scala:423), which has no missing parents
14/07/24 16:29:37 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from Stage 1 (VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:423)
14/07/24 16:29:37 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with
1 tasks
14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@ip-10-5-147-209.ec2.internal:60530/user/Executor#-2098248966]
with ID 0
14/07/24 16:29:39 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID
0 on executor 0: ip-10-5-147-209.ec2.internal (PROCESS_LOCAL)
14/07/24 16:29:39 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as
2300 bytes in 3 ms
14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@ip-10-167-166-70.ec2.internal:53470/user/Executor#-1954387250]
with ID 3
14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@ip-10-169-50-78.ec2.internal:37584/user/Executor#-247338355]
with ID 2
14/07/24 16:29:39 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkExecutor@ip-10-95-161-133.ec2.internal:55718/user/Executor#-2120787048]
with ID 1
14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager
ip-10-167-166-70.ec2.internal:52351 with 294.9 MB RAM
14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager
ip-10-5-147-209.ec2.internal:34712 with 294.9 MB RAM
14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager
ip-10-169-50-78.ec2.internal:35244 with 294.9 MB RAM
14/07/24 16:29:39 INFO storage.BlockManagerInfo: Registering block manager
ip-10-95-161-133.ec2.internal:44976 with 294.9 MB RAM
14/07/24 16:30:09 INFO cluster.SparkDeploySchedulerBackend: Executor 0
disconnected, so removing it
14/07/24 16:30:09 INFO client.AppClient$ClientActor: Executor updated:
app-20140724162937-0004/0 is now EXITED (Command exited with code 52)
14/07/24 16:30:09 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20140724162937-0004/0 removed: Command exited with code 52
14/07/24 16:30:09 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on
ip-10-5-147-209.ec2.internal: remote Akka client disassociated
14/07/24 16:30:09 INFO scheduler.TaskSetManager: Re-queueing tasks for 0
from TaskSet 1.0
14/07/24 16:30:10 WARN scheduler.TaskSetManager: Lost TID 0 (task 1.0:0)
14/07/24 16:30:10 INFO client.AppClient$ClientActor: Executor added:
app-20140724162937-0004/4 on
worker-20140724151003-ip-10-5-147-209.ec2.internal-55958
(ip-10-5-147-209.ec2.internal:55958) with 4 cores
14/07/24 16:30:10 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID
1 on executor 1: ip-10-95-161-133.ec2.internal (PROCESS_LOCAL)
14/07/24 16:30:10 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20140724162937-0004/4 on hostPort ip-10-5-147-209.ec2.internal:

Re: Starting with spark

2014-07-24 Thread Sean Owen
As was already noted - you have to enable Spark in the Quickstart VM
as it's not on by default. Just choose to start the service it in CM.

For anyone that's had any issue with it, please let me know offline.
My experience has been that it works out of the box. There's likely a
simple resolution, or else something that can be fixed in the VM.

On Thu, Jul 24, 2014 at 6:26 PM, Kostiantyn Kudriavtsev
 wrote:
> Hi Sam,
>
> I tried Spark on Cloudera a couple month age, any there were a lot of
> issues… Fortunately, I was able to switch to Hortonworks and exerting works
> perfect. In general, you can try two mode: standalone and via YARN.
> Personally, I found using Spark via YARN more comfortable special for
> administrating. You can era about my experience w/ standalone mode:
> http://simpletoad.blogspot.com/2014/04/spark-on-hdp2.html
>
> On Jul 24, 2014, at 8:12 PM, Jerry  wrote:
>
> Hi Sameer,
>
> I think it is much easier to start using Spark in standalone mode on a
> single machine. Last time I tried cloudera manager to deploy spark, it
> wasn't very straight forward and I hit couple of obstacles along the way.
> However, standalone mode is very easy to start exploring spark.
>
> Best Regards,
>
> Jerry
>
> Sent from my iPad
>
> On Jul 24, 2014, at 6:53 AM, Sameer Sayyed  wrote:
>
> Hello All,
>
> I am new user of spark, I am using cloudera-quickstart-vm-5.0.0-0-vmware for
> execute sample examples of Spark.
> I am very sorry for silly and basic question.
> I am not able to deploy and execute sample examples of spark.
>
> please suggest me how to start with spark.
>
> Please help me
> Thanks in advance.
>
> Regards,
> Sam
>
>


Re: akka 2.3.x?

2014-07-24 Thread Matei Zaharia
This is being tracked here: https://issues.apache.org/jira/browse/SPARK-1812, 
since it will also be needed for cross-building with Scala 2.11. Maybe we can 
do it before that. Probably too late for 1.1, but you should open an issue for 
1.2.

In that JIRA I linked, there's a pull request from a month ago that adds Akka 
2.3, so that's worth looking at.

Matei

On Jul 24, 2014, at 10:19 AM, yardena  wrote:

> We are also eagerly waiting for akka 2.3.4 support as we use Akka (and Spray)
> directly in addition to Spark.
> 
>  Yardena
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/akka-2-3-x-tp10513p10597.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



rdd.saveAsTextFile blows up

2014-07-24 Thread Eric Friedman
I'm trying to run a simple pipeline using PySpark, version 1.0.1

I've created an RDD over a parquetFile and am mapping the contents with a
transformer function and now wish to write the data out to HDFS.

All of the executors fail with the same stack trace (below)

I do get a directory on HDFS, but it's empty except for a file named
_temporary.

Any ideas?

java.io.IOException (java.io.IOException: Filesystem closed}
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629)
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:735)
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:793)
java.io.DataInputStream.readFully(DataInputStream.java:195)
java.io.DataInputStream.readFully(DataInputStream.java:169)
parquet.hadoop.ParquetFileReader$Chunk.(ParquetFileReader.java:369)
parquet.hadoop.ParquetFileReader$Chunk.(ParquetFileReader.java:362)
parquet.hadoop.ParquetFileReader.readColumnChunkPages(ParquetFileReader.java:411)
parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:349)
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966)
scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:293)
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:175)
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)


Re: Starting with spark

2014-07-24 Thread Kostiantyn Kudriavtsev
Hi Sam,

I tried Spark on Cloudera a couple month age, any there were a lot of issues… 
Fortunately, I was able to switch to Hortonworks and exerting works perfect. In 
general, you can try two mode: standalone and via YARN. Personally, I found 
using Spark via YARN more comfortable special for administrating. You can era 
about my experience w/ standalone mode: 
http://simpletoad.blogspot.com/2014/04/spark-on-hdp2.html

On Jul 24, 2014, at 8:12 PM, Jerry  wrote:

> Hi Sameer,
> 
> I think it is much easier to start using Spark in standalone mode on a single 
> machine. Last time I tried cloudera manager to deploy spark, it wasn't very 
> straight forward and I hit couple of obstacles along the way. However, 
> standalone mode is very easy to start exploring spark.
> 
> Best Regards,
> 
> Jerry
> 
> Sent from my iPad
> 
> On Jul 24, 2014, at 6:53 AM, Sameer Sayyed  wrote:
> 
>> Hello All,
>> 
>> I am new user of spark, I am using cloudera-quickstart-vm-5.0.0-0-vmware for 
>> execute sample examples of Spark. 
>> I am very sorry for silly and basic question.
>> I am not able to deploy and execute sample examples of spark.
>> 
>> please suggest me how to start with spark.
>> 
>> Please help me
>> Thanks in advance.
>> 
>> Regards,  
>> Sam



Re: akka 2.3.x?

2014-07-24 Thread yardena
We are also eagerly waiting for akka 2.3.4 support as we use Akka (and Spray)
directly in addition to Spark.

  Yardena



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/akka-2-3-x-tp10513p10597.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Starting with spark

2014-07-24 Thread Jerry
Hi Sameer,

I think it is much easier to start using Spark in standalone mode on a single 
machine. Last time I tried cloudera manager to deploy spark, it wasn't very 
straight forward and I hit couple of obstacles along the way. However, 
standalone mode is very easy to start exploring spark.

Best Regards,

Jerry

Sent from my iPad

> On Jul 24, 2014, at 6:53 AM, Sameer Sayyed  wrote:
> 
> Hello All,
> 
> I am new user of spark, I am using cloudera-quickstart-vm-5.0.0-0-vmware for 
> execute sample examples of Spark. 
> I am very sorry for silly and basic question.
> I am not able to deploy and execute sample examples of spark.
> 
> please suggest me how to start with spark.
> 
> Please help me
> Thanks in advance.
> 
> Regards,  
> Sam


Re: Configuring Spark Memory

2014-07-24 Thread Aaron Davidson
Whoops, I was mistaken in my original post last year. By default, there is
one executor per node per Spark Context, as you said.
"spark.executor.memory" is the amount of memory that the application
requests for each of its executors. SPARK_WORKER_MEMORY is the amount of
memory a Spark Worker is willing to allocate in executors.

So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your cluster,
and spark.executor.memory to 4g, you would be able to run 2 simultaneous
Spark Contexts who get 4g per node. Similarly, if spark.executor.memory
were 8g, you could only run 1 Spark Context at a time on the cluster, but
it would get all the cluster's memory.


On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson 
wrote:

> Thank you Nishkam,
> I have read your code. So, for the sake of my understanding, it seems that
> for each spark context there is one executor per node? Can anyone confirm
> this?
>
>
> --
> Martin Goodson  |  VP Data Science
> (0)20 3397 1240
> [image: Inline image 1]
>
>
> On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi  wrote:
>
>> See if this helps:
>>
>> https://github.com/nishkamravi2/SparkAutoConfig/
>>
>> It's a very simple tool for auto-configuring default parameters in Spark.
>> Takes as input high-level parameters (like number of nodes, cores per node,
>> memory per node, etc) and spits out default configuration, user advice and
>> command line. Compile (javac SparkConfigure.java) and run (java
>> SparkConfigure).
>>
>> Also cc'ing dev in case others are interested in helping evolve this over
>> time (by refining the heuristics and adding more parameters).
>>
>>
>>  On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson 
>> wrote:
>>
>>> Thanks Andrew,
>>>
>>> So if there is only one SparkContext there is only one executor per
>>> machine? This seems to contradict Aaron's message from the link above:
>>>
>>> "If each machine has 16 GB of RAM and 4 cores, for example, you might
>>> set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by
>>> Spark.)"
>>>
>>> Am I reading this incorrectly?
>>>
>>> Anyway our configuration is 21 machines (one master and 20 slaves) each
>>> with 60Gb. We would like to use 4 cores per machine. This is pyspark so we
>>> want to leave say 16Gb on each machine for python processes.
>>>
>>> Thanks again for the advice!
>>>
>>>
>>>
>>> --
>>> Martin Goodson  |  VP Data Science
>>> (0)20 3397 1240
>>> [image: Inline image 1]
>>>
>>>
>>> On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash 
>>> wrote:
>>>
 Hi Martin,

 In standalone mode, each SparkContext you initialize gets its own set
 of executors across the cluster.  So for example if you have two shells
 open, they'll each get two JVMs on each worker machine in the cluster.

 As far as the other docs, you can configure the total number of cores
 requested for the SparkContext, the amount of memory for the executor JVM
 on each machine, the amount of memory for the Master/Worker daemons (little
 needed since work is done in executors), and several other settings.

 Which of those are you interested in?  What spec hardware do you have
 and how do you want to configure it?

 Andrew


 On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson 
 wrote:

> We are having difficulties configuring Spark, partly because we still
> don't understand some key concepts. For instance, how many executors are
> there per machine in standalone mode? This is after having closely
> read the documentation several times:
>
> *http://spark.apache.org/docs/latest/configuration.html
> *
> *http://spark.apache.org/docs/latest/spark-standalone.html
> *
> *http://spark.apache.org/docs/latest/tuning.html
> *
> *http://spark.apache.org/docs/latest/cluster-overview.html
> *
>
> The cluster overview has some information here about executors but is
> ambiguous about whether there are single executors or multiple executors 
> on
> each machine.
>
>  This message from Aaron Davidson implies that the executor memory
> should be set to total available memory on the machine divided by the
> number of cores:
> *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E
> *
>
> But other messages imply that the executor memory should be set to the
> *total* available memory of each machine.
>
> We would very much appreciate some clarity on this and the myriad of
> other memory settings available (daemon memory, worker memory etc). 

Re: Configuring Spark Memory

2014-07-24 Thread Martin Goodson
Great - thanks for the clarification Aaron. The offer stands for me to
write some documentation and an example that covers this without leaving
*any* room for ambiguity.




-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


On Thu, Jul 24, 2014 at 6:09 PM, Aaron Davidson  wrote:

> Whoops, I was mistaken in my original post last year. By default, there is
> one executor per node per Spark Context, as you said.
> "spark.executor.memory" is the amount of memory that the application
> requests for each of its executors. SPARK_WORKER_MEMORY is the amount of
> memory a Spark Worker is willing to allocate in executors.
>
> So if you were to set SPARK_WORKER_MEMORY to 8g everywhere on your
> cluster, and spark.executor.memory to 4g, you would be able to run 2
> simultaneous Spark Contexts who get 4g per node. Similarly, if
> spark.executor.memory were 8g, you could only run 1 Spark Context at a time
> on the cluster, but it would get all the cluster's memory.
>
>
> On Thu, Jul 24, 2014 at 7:25 AM, Martin Goodson 
> wrote:
>
>> Thank you Nishkam,
>> I have read your code. So, for the sake of my understanding, it seems
>> that for each spark context there is one executor per node? Can anyone
>> confirm this?
>>
>>
>> --
>> Martin Goodson  |  VP Data Science
>> (0)20 3397 1240
>> [image: Inline image 1]
>>
>>
>> On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi  wrote:
>>
>>> See if this helps:
>>>
>>> https://github.com/nishkamravi2/SparkAutoConfig/
>>>
>>> It's a very simple tool for auto-configuring default parameters in
>>> Spark. Takes as input high-level parameters (like number of nodes, cores
>>> per node, memory per node, etc) and spits out default configuration, user
>>> advice and command line. Compile (javac SparkConfigure.java) and run (java
>>> SparkConfigure).
>>>
>>> Also cc'ing dev in case others are interested in helping evolve this
>>> over time (by refining the heuristics and adding more parameters).
>>>
>>>
>>>  On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson 
>>> wrote:
>>>
 Thanks Andrew,

 So if there is only one SparkContext there is only one executor per
 machine? This seems to contradict Aaron's message from the link above:

 "If each machine has 16 GB of RAM and 4 cores, for example, you might
 set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by
 Spark.)"

 Am I reading this incorrectly?

 Anyway our configuration is 21 machines (one master and 20 slaves) each
 with 60Gb. We would like to use 4 cores per machine. This is pyspark so we
 want to leave say 16Gb on each machine for python processes.

 Thanks again for the advice!



 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]


 On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash 
 wrote:

> Hi Martin,
>
> In standalone mode, each SparkContext you initialize gets its own set
> of executors across the cluster.  So for example if you have two shells
> open, they'll each get two JVMs on each worker machine in the cluster.
>
> As far as the other docs, you can configure the total number of cores
> requested for the SparkContext, the amount of memory for the executor JVM
> on each machine, the amount of memory for the Master/Worker daemons 
> (little
> needed since work is done in executors), and several other settings.
>
> Which of those are you interested in?  What spec hardware do you have
> and how do you want to configure it?
>
> Andrew
>
>
> On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson 
> wrote:
>
>> We are having difficulties configuring Spark, partly because we still
>> don't understand some key concepts. For instance, how many executors are
>> there per machine in standalone mode? This is after having closely
>> read the documentation several times:
>>
>> *http://spark.apache.org/docs/latest/configuration.html
>> *
>> *http://spark.apache.org/docs/latest/spark-standalone.html
>> *
>> *http://spark.apache.org/docs/latest/tuning.html
>> *
>> *http://spark.apache.org/docs/latest/cluster-overview.html
>> *
>>
>> The cluster overview has some information here about executors but is
>> ambiguous about whether there are single executors or multiple executors 
>> on
>> each machine.
>>
>>  This message from Aaron Davidson implies that the executor memory
>> should be set to total available memory on the machine divided by the
>> number of cores:
>> *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@ma

Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-24 Thread Tathagata Das
You will have to define your own stream-to-iterator function and use the
socketStream. The function should return custom delimited object as bytes
are continuously coming in. When data is insufficient, the function should
block.

TD
On Jul 23, 2014 6:52 PM, "kytay"  wrote:

> Hi TD
>
> You are right, I did not include "\n" to delimit the string flushed. That's
> the reason.
>
> Is there a way for me to define the delimiter? Like SOH or ETX instead of
> "\n"
>
> Regards
> kytay
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p10558.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: GraphX Pragel implementation

2014-07-24 Thread Arun Kumar
Hi

While using pregel  API for Iterations how to figure out which super step
the iteration currently in.

Regards
Arun


On Thu, Jul 17, 2014 at 4:24 PM, Arun Kumar  wrote:

> Hi
>
>
>
> I am trying to implement belief propagation algorithm in GraphX using the
> pragel API.
>
> *def* pregel[A]
>
>   (initialMsg*:* A,
>
>maxIter*:* Int = *Int*.*MaxValue*,
>
>activeDir*:* EdgeDirection = *EdgeDirection*.*Out*)
>
>   (vprog*:* (VertexId, VD, A) *=>* *VD*,
>
>sendMsg*:* EdgeTriplet[VD, ED] *=>* *Iterator*[(VertexId, A)],
>
>mergeMsg*:* (A, A) *=>* A)
>
> In this can we create messages in vprog function(From in coming messages)
> and send them using sendMsg ?
>
>
>
> Regards
> Arun
>


Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-07-24 Thread lihu
​Which code do you used, do you caused by your own code or something in
spark itself?


On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com  wrote:

> I have the same problem
>
>
> On Sat, Jul 19, 2014 at 12:31 AM, lihu  wrote:
>
>> Hi,
>> Everyone.  I have a piece of following code. When I run it,
>> it occurred the error just like below, it seem that the SparkContext is not
>> serializable, but i do not try to use the SparkContext except the broadcast.
>> [In fact, this code is in the MLLib, I just try to broadcast the
>>  centerArrays ]
>>
>> it can success in the redeceBykey operation, but failed at the
>> collect operation, this confused me.
>>
>>
>> INFO DAGScheduler: Failed to run collect at KMeans.scala:235
>> [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
>> not serializable: java.io.NotSerializableException:
>> org.apache.spark.SparkContext
>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>> java.io.NotSerializableException: org.apache.spark.SparkContext
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>  at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>
>>
>>
>>
>> private def initKMeansParallel(data: RDD[Array[Double]]):
>> Array[ClusterCenters] = {
>>
>> @transient val sc = data.sparkContext   // I try to add the 
>> transient
>> annotation here, but it doesn't work
>>
>> // Initialize each run's center to a random point
>> val seed = new XORShiftRandom().nextInt()
>> val sample = data.takeSample(true, runs, seed).toSeq
>> val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
>>
>> // On each step, sample 2 * k points on average for each run with
>> probability proportional
>> // to their squared distance from that run's current centers
>> for (step <- 0 until initializationSteps) {
>>   val centerArrays = sc.broadcast(centers.map(_.toArray))
>>   val sumCosts = data.flatMap { point =>
>> for (r <- 0 until runs) yield (r,
>> KMeans.pointCost(centerArrays.value(r), point))
>>   }.reduceByKey(_ + _).collectAsMap()
>>   //can pass at this point
>>   val chosen = data.mapPartitionsWithIndex { (index, points) =>
>> val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
>> for {
>>   p <- points
>>   r <- 0 until runs
>>   if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r),
>> p) * 2 * k / sumCosts(r)
>> } yield (r, p)
>>   }.collect()
>>   // failed at this
>> point.
>>   for ((r, p) <- chosen) {
>> centers(r) += p
>>   }
>> }
>>
>>
>>
>>
>>
>


-- 
*Best Wishes!*

 *Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
) *
*Tsinghua University, China*

*Email: lihu...@gmail.com *
*Tel  : +86 15120081920*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
*


Spark got stuck with a loop

2014-07-24 Thread Denis RP
Hi,
I ran spark standalone mode on a cluster and it went well for approximately
one hour, then the driver's output stopped with the following:

14/07/24 08:07:36 INFO MapOutputTrackerMasterActor: Asked to send map output
locations for shuffle 36 to spark@worker5.local:47416
14/07/24 08:07:36 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 36 is 265 bytes
14/07/24 08:30:04 INFO MapOutputTrackerMasterActor: Asked to send map output
locations for shuffle 39 to spark@worker5.local:47416
14/07/24 08:30:04 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 39 is 265 bytes

Then I checked the spark UI, found only one active task, then I checked that
worker's stderr, it seemed the worker had fallen into a loop:

14/07/24 09:18:18 INFO BlockManager: Found block rdd_14_3 locally
14/07/24 09:18:18 INFO BlockManager: Found block rdd_14_3 locally
14/07/24 09:18:18 INFO BlockManager: Found block rdd_14_3 locally
14/07/24 09:18:18 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/07/24 09:18:18 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 0 non-empty blocks out of 28 blocks
14/07/24 09:18:18 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 0 remote fetches in 0 ms

These aberrant info was repeatedly outputted.

So, what should I do to fix it? I have run the program for multiple times
and sooner or later it ends up in the case. And I tried to extend the
memory, didn't work.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-got-stuck-with-a-loop-tp10590.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Starting with spark

2014-07-24 Thread Marco Shaw
First thing...  Go into the Cloudera Manager and make sure that the Spark
service (master?) is started.

Marco


On Thu, Jul 24, 2014 at 7:53 AM, Sameer Sayyed 
wrote:

> Hello All,
>
> I am new user of spark, I am using *cloudera-quickstart-vm-5.0.0-0-vmware*
> for execute sample examples of Spark.
> I am very sorry for silly and basic question.
> I am not able to deploy and execute sample examples of spark.
>
> please suggest me *how to start with spark*.
>
> Please help me
> Thanks in advance.
>
> Regards,
> Sam
>


GraphX for pyspark?

2014-07-24 Thread Eric Friedman
I understand that GraphX is not yet available for pyspark.  I was wondering
if the Spark team has set a target release and timeframe for doing that
work?

Thank you,
Eric


Re: Spark Function setup and cleanup

2014-07-24 Thread Yosi Botzer
In my case I want to reach HBase. For every record with userId I want to
get some extra information about the user and add it to result record for
further prcessing


On Thu, Jul 24, 2014 at 9:11 AM, Yanbo Liang  wrote:

> If you want to connect to DB in program, you can use JdbcRDD (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
> )
>
>
> 2014-07-24 18:32 GMT+08:00 Yosi Botzer :
>
> Hi,
>>
>> I am using the Java api of Spark.
>>
>> I wanted to know if there is a way to run some code in a manner that is
>> like the setup() and cleanup() methods of Hadoop Map/Reduce
>>
>> The reason I need it is because I want to read something from the DB
>> according to each record I scan in my Function, and I would like to open
>> the DB connection only once (and close it only once).
>>
>> Thanks
>>
>
>


Re: Configuring Spark Memory

2014-07-24 Thread Martin Goodson
Thank you Nishkam,
I have read your code. So, for the sake of my understanding, it seems that
for each spark context there is one executor per node? Can anyone confirm
this?


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


On Thu, Jul 24, 2014 at 6:12 AM, Nishkam Ravi  wrote:

> See if this helps:
>
> https://github.com/nishkamravi2/SparkAutoConfig/
>
> It's a very simple tool for auto-configuring default parameters in Spark.
> Takes as input high-level parameters (like number of nodes, cores per node,
> memory per node, etc) and spits out default configuration, user advice and
> command line. Compile (javac SparkConfigure.java) and run (java
> SparkConfigure).
>
> Also cc'ing dev in case others are interested in helping evolve this over
> time (by refining the heuristics and adding more parameters).
>
>
>  On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson 
> wrote:
>
>> Thanks Andrew,
>>
>> So if there is only one SparkContext there is only one executor per
>> machine? This seems to contradict Aaron's message from the link above:
>>
>> "If each machine has 16 GB of RAM and 4 cores, for example, you might set
>> spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.)"
>>
>> Am I reading this incorrectly?
>>
>> Anyway our configuration is 21 machines (one master and 20 slaves) each
>> with 60Gb. We would like to use 4 cores per machine. This is pyspark so we
>> want to leave say 16Gb on each machine for python processes.
>>
>> Thanks again for the advice!
>>
>>
>>
>> --
>> Martin Goodson  |  VP Data Science
>> (0)20 3397 1240
>> [image: Inline image 1]
>>
>>
>> On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash  wrote:
>>
>>> Hi Martin,
>>>
>>> In standalone mode, each SparkContext you initialize gets its own set of
>>> executors across the cluster.  So for example if you have two shells open,
>>> they'll each get two JVMs on each worker machine in the cluster.
>>>
>>> As far as the other docs, you can configure the total number of cores
>>> requested for the SparkContext, the amount of memory for the executor JVM
>>> on each machine, the amount of memory for the Master/Worker daemons (little
>>> needed since work is done in executors), and several other settings.
>>>
>>> Which of those are you interested in?  What spec hardware do you have
>>> and how do you want to configure it?
>>>
>>> Andrew
>>>
>>>
>>> On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson 
>>> wrote:
>>>
 We are having difficulties configuring Spark, partly because we still
 don't understand some key concepts. For instance, how many executors are
 there per machine in standalone mode? This is after having closely
 read the documentation several times:

 *http://spark.apache.org/docs/latest/configuration.html
 *
 *http://spark.apache.org/docs/latest/spark-standalone.html
 *
 *http://spark.apache.org/docs/latest/tuning.html
 *
 *http://spark.apache.org/docs/latest/cluster-overview.html
 *

 The cluster overview has some information here about executors but is
 ambiguous about whether there are single executors or multiple executors on
 each machine.

  This message from Aaron Davidson implies that the executor memory
 should be set to total available memory on the machine divided by the
 number of cores:
 *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E
 *

 But other messages imply that the executor memory should be set to the
 *total* available memory of each machine.

 We would very much appreciate some clarity on this and the myriad of
 other memory settings available (daemon memory, worker memory etc). Perhaps
 a worked example could be added to the docs? I would be happy to provide
 some text as soon as someone can enlighten me on the technicalities!

 Thank you

 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 [image: Inline image 1]

>>>
>>>
>>
>


Re: new error for me

2014-07-24 Thread phoenix bai
I am currently facing the same problem. error snapshot as below:

14-07-24 19:15:30 WARN [pool-3-thread-1] SendingConnection: Error
finishing connection to r64b22034.tt.net/10.148.129.84:47525
java.net.ConnectException: Connection timed out
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599)
at 
org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318)
at 
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
14-07-24 19:15:30 INFO [pool-3-thread-1] ConnectionManager: Handling
connection error on connection to
ConnectionManagerId(r64b22034.tt.net,47525)
14-07-24 19:15:30 INFO [pool-3-thread-1] ConnectionManager: Removing
SendingConnection to ConnectionManagerId(r64b22034.tt.net,47525)
14-07-24 19:15:30 INFO [pool-3-thread-1] ConnectionManager: Notifying
org.apache.spark.network.ConnectionManager$MessageStatus@1704ebb


could anyone help shed a light on this?


thanks




On Tue, Jul 22, 2014 at 11:35 AM, Nathan Kronenfeld <
nkronenf...@oculusinfo.com> wrote:

> Does anyone know what this error means:
> 14/07/21 23:07:22 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
> 14/07/21 23:07:22 INFO TaskSetManager: Starting task 3.0:0 as TID 1620 on
> executor 27: r104u05.oculus.local (PROCESS_LOCAL)
> 14/07/21 23:07:22 INFO TaskSetManager: Serialized task 3.0:0 as 8620 bytes
> in 1 ms
> 14/07/21 23:07:36 INFO BlockManagerInfo: Added taskresult_1620 in memory
> on r104u05.oculus.local:50795 (size: 64.9 MB, free: 18.3 GB)
> 14/07/21 23:07:36 INFO SendingConnection: Initiating connection to
> [r104u05.oculus.local/192.168.0.105:50795]
> 14/07/21 23:07:57 INFO ConnectionManager: key already cancelled ?
> sun.nio.ch.SelectionKeyImpl@1d86a150
> java.nio.channels.CancelledKeyException
> at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
> at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77)
> at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:265)
> at
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:115)
> 14/07/21 23:07:57 WARN SendingConnection: Error finishing connection to
> r104u05.oculus.local/192.168.0.105:50795
> java.net.ConnectException: Connection timed out
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
> at
> org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318)
> at
> org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:202)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
> 14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on
> connection to ConnectionManagerId(r104u05.oculus.local,50795)
> 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
> ConnectionManagerId(r104u05.oculus.local,50795)
> 14/07/21 23:07:57 INFO ConnectionManager: Notifying
> org.apache.spark.network.ConnectionManager$MessageStatus@13ad274d
> 14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on
> connection to ConnectionManagerId(r104u05.oculus.local,50795)
> 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
> ConnectionManagerId(r104u05.oculus.local,50795)
> 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
> ConnectionManagerId(r104u05.oculus.local,50795)
> 14/07/21 23:07:57 WARN TaskSetManager: Lost TID 1620 (task 3.0:0)
> 14/07/21 23:07:57 WARN TaskSetManager: Lost result for TID 1620 on host
> r104u05.oculus.local
>
> I've never seen this one before, and now it's coming up consistently.
>
> Thanks,
>  -Nathan
>
>


Re: Spark Function setup and cleanup

2014-07-24 Thread Yanbo Liang
If you want to connect to DB in program, you can use JdbcRDD (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
)


2014-07-24 18:32 GMT+08:00 Yosi Botzer :

> Hi,
>
> I am using the Java api of Spark.
>
> I wanted to know if there is a way to run some code in a manner that is
> like the setup() and cleanup() methods of Hadoop Map/Reduce
>
> The reason I need it is because I want to read something from the DB
> according to each record I scan in my Function, and I would like to open
> the DB connection only once (and close it only once).
>
> Thanks
>


Re: save to HDFS

2014-07-24 Thread lmk
Thanks Akhil.
I was able to view the files. Actually I was trying to list the same using
regular ls and since it did not show anything I was concerned.
Thanks for showing me the right direction.

Regards,
lmk



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578p10583.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: save to HDFS

2014-07-24 Thread Akhil Das
This piece of code

saveAsHadoopFile[TextOutputFormat[NullWritable,Text]]("hdfs://
masteripaddress:9000/root/test-app/test1/")

Saves the RDD into HDFS, and yes you can physically see the files using the
hadoop command (hadoop fs -ls /root/test-app/test1 - yes you need to login
to the cluster). In case if you are not able to execute the command (like
hadoop command not found), you can do like $HADOOP_HOME/bin/hadoop fs -ls
/root/test-app/test1



Thanks
Best Regards


On Thu, Jul 24, 2014 at 4:34 PM, lmk 
wrote:

> Hi Akhil,
> I am sure that the RDD that I saved is not empty. I have tested it using
> take.
> But is there no way that I can see this saved physically like we do in the
> normal context? Can't I view this folder as I am already logged into the
> cluster?
> And, should I run hadoop fs -ls
> hdfs://masteripaddress:9000/root/test-app/test1/
> after I login to the cluster?
>
> Thanks,
> lmk
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578p10581.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: save to HDFS

2014-07-24 Thread lmk
Hi Akhil,
I am sure that the RDD that I saved is not empty. I have tested it using
take.
But is there no way that I can see this saved physically like we do in the
normal context? Can't I view this folder as I am already logged into the
cluster?
And, should I run hadoop fs -ls
hdfs://masteripaddress:9000/root/test-app/test1/
after I login to the cluster?

Thanks,
lmk



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578p10581.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Starting with spark

2014-07-24 Thread Akhil Das
Here's the complete overview http://spark.apache.org/docs/latest/

And Here's the quick start guidelines
http://spark.apache.org/docs/latest/quick-start.html

I would suggest you downloading the Spark pre-compiled binaries

and
start off yourself.

Thanks
Best Regards


On Thu, Jul 24, 2014 at 4:23 PM, Sameer Sayyed 
wrote:

> Hello All,
>
> I am new user of spark, I am using *cloudera-quickstart-vm-5.0.0-0-vmware*
> for execute sample examples of Spark.
> I am very sorry for silly and basic question.
> I am not able to deploy and execute sample examples of spark.
>
> please suggest me *how to start with spark*.
>
> Please help me
> Thanks in advance.
>
> Regards,
> Sam
>


Re: save to HDFS

2014-07-24 Thread Akhil Das
Are you sure the RDD that you were saving isn't empty!?

Are you seeing a _SUCCESS file in this location? hdfs://
masteripaddress:9000/root/test-app/test1/
 (Do hadoop fs -ls hdfs://masteripaddress:9000/root/test-app/test1/)


Thanks
Best Regards


On Thu, Jul 24, 2014 at 4:24 PM, lmk 
wrote:

> Hi,
> I have a scala application which I have launched into a spark cluster. I
> have the following statement trying to save to a folder in the master:
> saveAsHadoopFile[TextOutputFormat[NullWritable,
> Text]]("hdfs://masteripaddress:9000/root/test-app/test1/")
>
> The application is executed successfully and log says that save is complete
> also. But I am not able to find the file I have saved anywhere. Is there a
> way I can access this file?
>
> Pls advice.
>
> Regards,
> lmk
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


save to HDFS

2014-07-24 Thread lmk
Hi,
I have a scala application which I have launched into a spark cluster. I
have the following statement trying to save to a folder in the master:
saveAsHadoopFile[TextOutputFormat[NullWritable,
Text]]("hdfs://masteripaddress:9000/root/test-app/test1/")

The application is executed successfully and log says that save is complete
also. But I am not able to find the file I have saved anywhere. Is there a
way I can access this file?

Pls advice.

Regards,
lmk



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/save-to-HDFS-tp10578.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Starting with spark

2014-07-24 Thread Sameer Sayyed
Hello All,

I am new user of spark, I am using *cloudera-quickstart-vm-5.0.0-0-vmware*
for execute sample examples of Spark.
I am very sorry for silly and basic question.
I am not able to deploy and execute sample examples of spark.

please suggest me *how to start with spark*.

Please help me
Thanks in advance.

Regards,
Sam


Re: streaming sequence files?

2014-07-24 Thread Sean Owen
Can you just call fileStream or textFileStream in the second app, to
consume files that appear in HDFS / Tachyon from the first job?

On Thu, Jul 24, 2014 at 2:43 AM, Barnaby  wrote:
> If I save an RDD as a sequence file such as:
>
> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
> wordCounts.foreachRDD( d => {
> d.saveAsSequenceFile("tachyon://localhost:19998/files/WordCounts-" +
> (new SimpleDateFormat("MMdd-HHmmss") format
> Calendar.getInstance.getTime).toString)
> })
>
> How can I use these results in another Spark app since there is no
> StreamingContext.sequenceFileStream()?
>
> Or,
>
> What is the best way to save RDDs of objects to files in one streaming app
> so that another app can stream those files in? Basically, reuse partially
> reduced RDDs for further processing so that it doesn't have to be done more
> than once.
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/streaming-sequence-files-tp10557.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark Function setup and cleanup

2014-07-24 Thread Yosi Botzer
Hi,

I am using the Java api of Spark.

I wanted to know if there is a way to run some code in a manner that is
like the setup() and cleanup() methods of Hadoop Map/Reduce

The reason I need it is because I want to read something from the DB
according to each record I scan in my Function, and I would like to open
the DB connection only once (and close it only once).

Thanks


Re: Help in merging a RDD agaisnt itself using the V of a (K,V).

2014-07-24 Thread Sean Owen
Yeah reduce() will leave you with one big collection of sets on the
driver. Maybe the set of all identifiers isn't so big -- a hundred
million Longs even isn't so much. I'm glad to hear cartesian works but
can that scale? you're making an RDD of N^2 elements initially which
is just vast.

On Thu, Jul 24, 2014 at 2:09 AM, Roch Denis  wrote:
> Ah yes, you're quite right with partitions I could probably process a good
> chunk of the data but I didn't think a reduce would work? Sorry, I'm still
> new to Spark and map reduce in general but I thought that the reduce result
> wasn't an RDD and had to fit into memory. If the result of a reduce can be
> any size, then yes I can see how to make it work.
>
> Sorry for not being certain, the doc is not quite clear on that point, at
> least to me.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Help-in-merging-a-RDD-agaisnt-itself-using-the-V-of-a-K-V-tp10530p10556.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


spark streaming actor receiver doesn't play well with kryoserializer

2014-07-24 Thread Alan Ngai
it looks like when you configure sparkconfig to use the kryoserializer in 
combination of using an ActorReceiver, bad things happen.  I modified the 
ActorWordCount example program from 

val sparkConf = new SparkConf().setAppName("ActorWordCount")

to

val sparkConf = new SparkConf()
  .setAppName("ActorWordCount")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)

and I get the stack trace below.  I figured it might be that Kryo doesn’t know 
how to serialize/deserialize the actor so I added a registry.  I also added a 
default empty constructor to SampleActorReceiver just for kicks

class SerializationRegistry extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[SampleActorReceiver])
  }
}

…

case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
  def this() = this(“”)
  ...
}

...
val sparkConf = new SparkConf()
  .setAppName("ActorWordCount")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", 
"org.apache.spark.examples.streaming.SerializationRegistry")


None of this worked, same stack trace.  Any idea what’s going on?  Is this a 
known issue and is there a workaround?  

14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while 
creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher 
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating 
[akka://spark/user/Supervisor0/SampleReceiver] with dispatcher 
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at 
akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at 
org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.(ActorReceiver.scala:152)
at 
org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at 
org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public 
akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with 
arguments [class java.lang.Class, class 
org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more



Re: Kyro deserialisation error

2014-07-24 Thread Guillaume Pitel

Hi,

We've got the same problem here (randomly happens) :

Unable to 
find class: 6  4 ڗ4ڻ 8 &44ں*Q|T4⛇` j4 Ǥ4ꙴg8 
4 ¾4Ú»   4   4Ú» pE4ʽ4ں*WsѴμˁ4ڻ4ʤ4ցbל4ڻ&

4[͝4[ۦ44ڻ!~44ڻΡ4Ƈ4Pҍ4҇Ÿ%Q4ɋ4‚ifj4w4Y4ڻ*¸4☮”R4Ҳ؅”R4X4ڻ
4]5ᴁX^34l[?s4ƾ4ڻ8BH4Z4@4jჴ? 4ڻ 
7B4ٛƒ/v4ꃂE4뿁4J04릁4%44ؕ w\44 
Ӓ¯ٕ4ڻ/lv4ⴁ40喴Ƴ䂁4¸C4P4ڻ _o4lbʂԛ4각 
4^x4ڻ


Clearly a stream corruption problem.

We've been running fine (afaik) on 1.0.0 for two weeks, switch to 1.0.1 
this Monday, and since, this kind of problem randomly occur.



Guillaume Pitel

Not sure if this helps, but it does seem to be part of a name in a
Wikipedia article, and Wikipedia is the data set. So something is
reading this class name from the data.

http://en.wikipedia.org/wiki/Carl_Fridtjof_Rode

On Thu, Jul 17, 2014 at 9:40 AM, Tathagata Das
 wrote:

Seems like there is some sort of stream corruption, causing Kryo read to
read a weird class name from the stream (the name "arl Fridtjof Rode" in the
exception cannot be a class!).
Not sure how to debug this.

@Patrick: Any idea?



--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. 
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705