Re: S3n, parallelism, partitions

2015-08-17 Thread Akshat Aranya
This will also depend on the file format you are using.

A word of advice: you would be much better off with the s3a file system.
As I found out recently the hard way, s3n has some issues with reading
through entire files even when looking for headers.

On Mon, Aug 17, 2015 at 2:10 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 s3n underneath uses the hadoop api, so i guess it would partition
 according to your hadoop configuration (128MB per partition by default)

 Thanks
 Best Regards

 On Mon, Aug 17, 2015 at 2:29 PM, matd matd...@gmail.com wrote:

 Hello,

 I would like to understand how the work is parallelized accross a Spark
 cluster (and what is left to the driver) when I read several files from a
 single folder in s3
 s3n://bucket_xyz/some_folder_having_many_files_in_it/

 How files (or file parts) are mapped to partitions ?

 Thanks
 Mathieu



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/S3n-parallelism-partitions-tp24293.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Accessing S3 files with s3n://

2015-08-10 Thread Akshat Aranya
Hi Jerry, Akhil,

Thanks your your help. With s3n, the entire file is downloaded even while
just creating the RDD with sqlContext.read.parquet().  It seems like even
just opening and closing the InputStream causes the entire data to get
fetched.

As it turned out, I was able to use s3a and avoid this problem.  I was
under the impression that s3a was only meant for using EMRFS, where the
metadata of the FS is kept separately.  This is not true; s3a maps object
keys directly to file names and directories.

On Sun, Aug 9, 2015 at 6:01 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi Akshat,

 Is there a particular reason you don't use s3a? From my experience,s3a
 performs much better than the rest. I believe the inefficiency is from the
 implementation of the s3 interface.

 Best Regards,

 Jerry

 Sent from my iPhone

 On 9 Aug, 2015, at 5:48 am, Akhil Das ak...@sigmoidanalytics.com wrote:

 Depends on which operation you are doing, If you are doing a .count() on a
 parquet, it might not download the entire file i think, but if you do a
 .count() on a normal text file it might pull the entire file.

 Thanks
 Best Regards

 On Sat, Aug 8, 2015 at 3:12 AM, Akshat Aranya aara...@gmail.com wrote:

 Hi,

 I've been trying to track down some problems with Spark reads being very
 slow with s3n:// URIs (NativeS3FileSystem).  After some digging around, I
 realized that this file system implementation fetches the entire file,
 which isn't really a Spark problem, but it really slows down things when
 trying to just read headers from a Parquet file or just creating partitions
 in the RDD.  Is this something that others have observed before, or am I
 doing something wrong?

 Thanks,
 Akshat





Accessing S3 files with s3n://

2015-08-07 Thread Akshat Aranya
Hi,

I've been trying to track down some problems with Spark reads being very
slow with s3n:// URIs (NativeS3FileSystem).  After some digging around, I
realized that this file system implementation fetches the entire file,
which isn't really a Spark problem, but it really slows down things when
trying to just read headers from a Parquet file or just creating partitions
in the RDD.  Is this something that others have observed before, or am I
doing something wrong?

Thanks,
Akshat


Re: Kryo serialization of classes in additional jars

2015-05-13 Thread Akshat Aranya
I cherry-picked this commit into my local 1.2 branch.  It fixed the problem
with setting spark.serializer, but I get a similar problem with
spark.closure.serializer

org.apache.spark.SparkException: Failed to register classes with Kryo
  at
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:100)
  at
org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:152)
  at
org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:114)
  at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:73)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
  at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
  at
org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
  at akka.dispatch.Mailbox.run(Mailbox.scala:220)
  at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
  at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: com.foo.Foo
  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:270)
  at
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:93)
  at
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:93)
  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:93)
  ... 21 more


On Mon, May 4, 2015 at 5:43 PM, Akshat Aranya aara...@gmail.com wrote:

 Actually, after some digging, I did find a JIRA for it: SPARK-5470.
 The fix for this has gone into master, but it isn't in 1.2.

 On Mon, May 4, 2015 at 2:47 PM, Imran Rashid iras...@cloudera.com wrote:
  Oh, this seems like a real pain.  You should file a jira, I didn't see an
  open issue -- if nothing else just to document the issue.
 
  As you've noted, the problem is that the serializer is created
 immediately
  in the executors, right when the SparkEnv is created, but the other jars
  aren't downloaded later.  I think you could workaround with some
 combination
  of pushing the jars to the cluster manually, and then using
  spark.executor.extraClassPath
 
  On Wed, Apr 29, 2015 at 6:42 PM, Akshat Aranya aara...@gmail.com
 wrote:
 
  Hi,
 
  Is it possible to register kryo serialization for classes contained in
  jars that are added with spark.jars?  In my experiment it doesn't
 seem to
  work, likely because the class registration happens before the jar is
  shipped to the executor and added to the classloader.  Here's the
 general
  idea of what I want to do:
 
 val sparkConf = new SparkConf(true)
.set(spark.jars, foo.jar)
.setAppName(foo)
.set(spark.serializer,
  org.apache.spark.serializer.KryoSerializer)
 
  // register classes contained in foo.jar
  sparkConf.registerKryoClasses(Array(
classOf[com.foo.Foo],
classOf[com.foo.Bar]))
 
 



Re: Kryo serialization of classes in additional jars

2015-05-04 Thread Akshat Aranya
Actually, after some digging, I did find a JIRA for it: SPARK-5470.
The fix for this has gone into master, but it isn't in 1.2.

On Mon, May 4, 2015 at 2:47 PM, Imran Rashid iras...@cloudera.com wrote:
 Oh, this seems like a real pain.  You should file a jira, I didn't see an
 open issue -- if nothing else just to document the issue.

 As you've noted, the problem is that the serializer is created immediately
 in the executors, right when the SparkEnv is created, but the other jars
 aren't downloaded later.  I think you could workaround with some combination
 of pushing the jars to the cluster manually, and then using
 spark.executor.extraClassPath

 On Wed, Apr 29, 2015 at 6:42 PM, Akshat Aranya aara...@gmail.com wrote:

 Hi,

 Is it possible to register kryo serialization for classes contained in
 jars that are added with spark.jars?  In my experiment it doesn't seem to
 work, likely because the class registration happens before the jar is
 shipped to the executor and added to the classloader.  Here's the general
 idea of what I want to do:

val sparkConf = new SparkConf(true)
   .set(spark.jars, foo.jar)
   .setAppName(foo)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 // register classes contained in foo.jar
 sparkConf.registerKryoClasses(Array(
   classOf[com.foo.Foo],
   classOf[com.foo.Bar]))



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ClassNotFoundException for Kryo serialization

2015-05-02 Thread Akshat Aranya
Now I am running up against some other problem while trying to schedule tasks:

15/05/01 22:32:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalStateException: unread block data
at 
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2419)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1380)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)


I verified that the same configuration works without using Kryo serialization.


On Fri, May 1, 2015 at 9:44 AM, Akshat Aranya aara...@gmail.com wrote:
 I cherry-picked the fix for SPARK-5470 and the problem has gone away.

 On Fri, May 1, 2015 at 9:15 AM, Akshat Aranya aara...@gmail.com wrote:
 Yes, this class is present in the jar that was loaded in the classpath
 of the executor Java process -- it wasn't even lazily added as a part
 of the task execution.  Schema$MyRow is a protobuf-generated class.

 After doing some digging around, I think I might be hitting up against
 SPARK-5470, the fix for which hasn't been merged into 1.2, as far as I
 can tell.

 On Fri, May 1, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote:
 bq. Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow

 So the above class is in the jar which was in the classpath ?
 Can you tell us a bit more about Schema$MyRow ?

 On Fri, May 1, 2015 at 8:05 AM, Akshat Aranya aara...@gmail.com wrote:

 Hi,

 I'm getting a ClassNotFoundException at the executor when trying to
 register a class for Kryo serialization:

 java.lang.reflect.InvocationTargetException
   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
   at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
   at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
   at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:243)
   at
 org.apache.spark.SparkEnv$.instantiateClassFromConf$1(SparkEnv.scala:254)
   at org.apache.spark.SparkEnv$.create(SparkEnv.scala:257)
   at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:182)
   at org.apache.spark.executor.Executor.init(Executor.scala:87)
   at
 org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:61)
   at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
   at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
   at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
   at
 org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
   at
 org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
   at
 scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
   at
 org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
   at
 org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
   at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
   at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: org.apache.spark.SparkException: Failed to load class to
 register with Kryo
   at
 org.apache.spark.serializer.KryoSerializer$$anonfun$2

ClassNotFoundException for Kryo serialization

2015-05-01 Thread Akshat Aranya
Hi,

I'm getting a ClassNotFoundException at the executor when trying to
register a class for Kryo serialization:

java.lang.reflect.InvocationTargetException
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
  at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
  at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
  at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:243)
  at
org.apache.spark.SparkEnv$.instantiateClassFromConf$1(SparkEnv.scala:254)
  at org.apache.spark.SparkEnv$.create(SparkEnv.scala:257)
  at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:182)
  at org.apache.spark.executor.Executor.init(Executor.scala:87)
  at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:61)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
  at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
  at
org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
  at akka.dispatch.Mailbox.run(Mailbox.scala:220)
  at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
  at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.spark.SparkException: Failed to load class to
register with Kryo
  at
org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:66)
  at
org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:61)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at
org.apache.spark.serializer.KryoSerializer.init(KryoSerializer.scala:61)
  ... 28 more
Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow
  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:190)
  at
org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:63)

I have verified that when the executor process is launched, my jar is in
the classpath of the command line of the executor.  I expect the class to
be found by the default classloader being used at KryoSerializer.scala:63

Any ideas?


Re: ClassNotFoundException for Kryo serialization

2015-05-01 Thread Akshat Aranya
Yes, this class is present in the jar that was loaded in the classpath
of the executor Java process -- it wasn't even lazily added as a part
of the task execution.  Schema$MyRow is a protobuf-generated class.

After doing some digging around, I think I might be hitting up against
SPARK-5470, the fix for which hasn't been merged into 1.2, as far as I
can tell.

On Fri, May 1, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote:
 bq. Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow

 So the above class is in the jar which was in the classpath ?
 Can you tell us a bit more about Schema$MyRow ?

 On Fri, May 1, 2015 at 8:05 AM, Akshat Aranya aara...@gmail.com wrote:

 Hi,

 I'm getting a ClassNotFoundException at the executor when trying to
 register a class for Kryo serialization:

 java.lang.reflect.InvocationTargetException
   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
   at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
   at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
   at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:243)
   at
 org.apache.spark.SparkEnv$.instantiateClassFromConf$1(SparkEnv.scala:254)
   at org.apache.spark.SparkEnv$.create(SparkEnv.scala:257)
   at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:182)
   at org.apache.spark.executor.Executor.init(Executor.scala:87)
   at
 org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:61)
   at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
   at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
   at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
   at
 org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
   at
 org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
   at
 scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
   at
 org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
   at
 org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
   at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
   at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: org.apache.spark.SparkException: Failed to load class to
 register with Kryo
   at
 org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:66)
   at
 org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:61)
   at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
   at
 org.apache.spark.serializer.KryoSerializer.init(KryoSerializer.scala:61)
   ... 28 more
 Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow
   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
   at java.security.AccessController.doPrivileged(Native Method)
   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
   at java.lang.Class.forName0(Native Method)
   at java.lang.Class.forName(Class.java:190)
   at
 org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:63)

 I have verified that when the executor process is launched, my jar is in
 the classpath

Re: ClassNotFoundException for Kryo serialization

2015-05-01 Thread Akshat Aranya
I cherry-picked the fix for SPARK-5470 and the problem has gone away.

On Fri, May 1, 2015 at 9:15 AM, Akshat Aranya aara...@gmail.com wrote:
 Yes, this class is present in the jar that was loaded in the classpath
 of the executor Java process -- it wasn't even lazily added as a part
 of the task execution.  Schema$MyRow is a protobuf-generated class.

 After doing some digging around, I think I might be hitting up against
 SPARK-5470, the fix for which hasn't been merged into 1.2, as far as I
 can tell.

 On Fri, May 1, 2015 at 9:05 AM, Ted Yu yuzhih...@gmail.com wrote:
 bq. Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow

 So the above class is in the jar which was in the classpath ?
 Can you tell us a bit more about Schema$MyRow ?

 On Fri, May 1, 2015 at 8:05 AM, Akshat Aranya aara...@gmail.com wrote:

 Hi,

 I'm getting a ClassNotFoundException at the executor when trying to
 register a class for Kryo serialization:

 java.lang.reflect.InvocationTargetException
   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
   at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
   at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
   at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:243)
   at
 org.apache.spark.SparkEnv$.instantiateClassFromConf$1(SparkEnv.scala:254)
   at org.apache.spark.SparkEnv$.create(SparkEnv.scala:257)
   at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:182)
   at org.apache.spark.executor.Executor.init(Executor.scala:87)
   at
 org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:61)
   at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
   at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
   at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
   at
 org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
   at
 org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
   at
 scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
   at
 org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
   at
 org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:36)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
   at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
   at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: org.apache.spark.SparkException: Failed to load class to
 register with Kryo
   at
 org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:66)
   at
 org.apache.spark.serializer.KryoSerializer$$anonfun$2.apply(KryoSerializer.scala:61)
   at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
   at
 org.apache.spark.serializer.KryoSerializer.init(KryoSerializer.scala:61)
   ... 28 more
 Caused by: java.lang.ClassNotFoundException: com.example.Schema$MyRow
   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
   at java.security.AccessController.doPrivileged(Native Method)
   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
   at java.lang.Class.forName0(Native Method)
   at java.lang.Class.forName(Class.java:190

Re: spark with standalone HBase

2015-04-30 Thread Akshat Aranya
Looking at your classpath, it looks like you've compiled Spark yourself.
Depending on which version of Hadoop you've compiled against (looks like
it's Hadoop 2.2 in your case), Spark will have its own version of
protobuf.  You should try by making sure both your HBase and Spark are
compiled against the same version of Hadoop.

On Thu, Apr 30, 2015 at 6:54 AM, Ted Yu yuzhih...@gmail.com wrote:

 The error indicates incompatible protobuf versions.

 Please take a look at 4.1.1 under
 http://hbase.apache.org/book.html#basic.prerequisites

 Cheers

 On Thu, Apr 30, 2015 at 3:49 AM, Saurabh Gupta saurabh.gu...@semusi.com
 wrote:

 Now able to solve the issue by setting

 SparkConf sconf = *new* SparkConf().setAppName(“App).setMaster(local)
 and

 conf.set(“zookeeper.znode.parent”, “/hbase-unsecure”)

 Standalone hbase has a table 'test'
 hbase(main):001:0 scan 'test'
 ROW  COLUMN+CELL
 row1column=cf:a, timestamp=1430234895637,
 value=value1
 row2column=cf:b, timestamp=1430234907537,
 value=value2
 row3column=cf:c, timestamp=1430234918284,
 value=value3

 Now facing this issue:

 ERROR TableInputFormat: java.io.IOException:
 java.lang.reflect.InvocationTargetException
 at
 org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:416)
 at
 org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:393)
 at
 org.apache.hadoop.hbase.client.HConnectionManager.getConnection(HConnectionManager.java:274)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:194)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:156)
 at
 org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:101)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1632)
 at org.apache.spark.rdd.RDD.count(RDD.scala:1012)
 at org.apache.spark.examples.HBaseTest$.main(HBaseTest.scala:58)
 at org.apache.spark.examples.HBaseTest.main(HBaseTest.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:607)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:167)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:190)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at
 org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:414)
 ... 23 more
 Caused by: java.lang.VerifyError: class
 org.apache.hadoop.hbase.protobuf.generated.ClientProtos$Result overrides
 final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
 at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at
 org.apache.hadoop.hbase.protobuf.ProtobufUtil.clinit(ProtobufUtil.java:176)
 at org.apache.hadoop.hbase.ClusterId.parseFrom(ClusterId.java:64)
 at
 org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:69)
 at
 org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:83)
 at
 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.retrieveClusterId(HConnectionManager.java:857)
 at
 

Kryo serialization of classes in additional jars

2015-04-29 Thread Akshat Aranya
Hi,

Is it possible to register kryo serialization for classes contained in jars
that are added with spark.jars?  In my experiment it doesn't seem to
work, likely because the class registration happens before the jar is
shipped to the executor and added to the classloader.  Here's the general
idea of what I want to do:

   val sparkConf = new SparkConf(true)
  .set(spark.jars, foo.jar)
  .setAppName(foo)
  .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)

// register classes contained in foo.jar
sparkConf.registerKryoClasses(Array(
  classOf[com.foo.Foo],
  classOf[com.foo.Bar]))


When are TaskCompletionListeners called?

2015-04-17 Thread Akshat Aranya
Hi,

I'm trying to figure out when TaskCompletionListeners are called -- are
they called at the end of the RDD's compute() method, or after the
iteration through the iterator of the compute() method is completed.

To put it another way, is this OK:

class DatabaseRDD[T] extends RDD[T] {

  def compute(...): Iterator[T] = {
val session = // acquire a DB session
TaskContext.get.addTaskCompletionListener { (context) =
  session.release()
}

val iterator = session.query(...)
iterator
  }
}


Re: Spark SQL code generation

2015-04-06 Thread Akshat Aranya
Thanks for the info, Michael.  Is there a reason to do so, as opposed to
shipping out the bytecode and loading it via the classloader?  Is it more
complex?  I can imagine caching to be effective for repeated queries, but
when the subsequent queries are different.

On Mon, Apr 6, 2015 at 2:41 PM, Michael Armbrust mich...@databricks.com
wrote:

 It is generated and cached on each of the executors.

 On Mon, Apr 6, 2015 at 2:32 PM, Akshat Aranya aara...@gmail.com wrote:

 Hi,

 I'm curious as to how Spark does code generation for SQL queries.

 Following through the code, I saw that an expression is parsed and
 compiled into a class using Scala reflection toolbox.  However, it's
 unclear to me whether the actual byte code is generated on the master or on
 each of the executors.  If it generated on the master, how is the byte code
 shipped out to the executors?

 Thanks,
 Akshat


 https://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html





Spark SQL code generation

2015-04-06 Thread Akshat Aranya
Hi,

I'm curious as to how Spark does code generation for SQL queries.

Following through the code, I saw that an expression is parsed and compiled
into a class using Scala reflection toolbox.  However, it's unclear to me
whether the actual byte code is generated on the master or on each of the
executors.  If it generated on the master, how is the byte code shipped out
to the executors?

Thanks,
Akshat

https://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html


Re: Getting to proto buff classes in Spark Context

2015-02-26 Thread Akshat Aranya
My guess would be that you are packaging too many things in your job, which
is causing problems with the classpath.  When your jar goes in first, you
get the correct version of protobuf, but some other version of something
else.  When your jar goes in later, other things work, but protobuf
breaks.  This is just a guess though; take a look at what you're packaging
in your jar and look for things that Spark or Kafka could also be using.

On Thu, Feb 26, 2015 at 10:06 AM, necro351 . necro...@gmail.com wrote:

 Hello everyone,

 We are trying to decode a message inside a Spark job that we receive from
 Kafka. The message is encoded using Proto Buff. The problem is when
 decoding we get class-not-found exceptions. We have tried remedies we found
 online in Stack Exchange and mail list archives but nothing seems to work.

 (This question is a re-ask, but we really cannot figure this one out.)

 We created a standalone repository with a very simple Spark job that
 exhibits the above issues. The spark job reads the messages from the FS,
 decodes them, and prints them. Its easy to checkout and try to see the
 exception yourself: just uncomment the code that prints the messages from
 within the RDD. The only sources are the generated Proto Buff java sources
 and a small Spark Job that decodes a message. I'd appreciate if anyone
 could take a look.

 https://github.com/vibhav/spark-protobuf

 We tried a couple remedies already.

 Setting spark.files.userClassPathFirst didn't fix the problem for us. I
 am not very familiar with the Spark and Scala environment, so please
 correct any incorrect assumptions or statements I make.

 However, I don't believe this to be a classpath visibility issue. I wrote
 a small helper method to print out the classpath from both the driver and
 worker, and the output is identical. (I'm printing out
 System.getProperty(java.class.path) -- is there a better way to do this
 or check the class path?). You can print out the class paths the same way
 we are from the example project above.

 Furthermore, userClassPathFirst seems to have a detrimental effect on
 otherwise working code, which I cannot explain or do not understand.

 For example, I created a trivial RDD as such:

 val l = List(1, 2, 3)
 sc.makeRDD(l).foreach((x: Int) = {
 println(x.toString)
 })

 With userClassPathFirst set, I encounter a java.lang.ClassCastException
 trying to execute that code. Is that to be expected? You can re-create this
 issue by commenting out the block of code that tries to print the above in
 the example project we linked to above.

 We also tried dynamically adding the jar with .addJar to the Spark Context
 but this seemed to have no effect.

 Thanks in advance for any help y'all can provide.



Missing tasks

2015-02-26 Thread Akshat Aranya
I am seeing a problem with a Spark job in standalone mode.  Spark master's
web interface shows a task RUNNING on a particular executor, but the logs
of the executor do not show the task being ever assigned to it, that is,
such a line is missing from the log:

15/02/25 16:53:36 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 27.

The missing task wasn't the first task executing on the executor.  Also,
subsequent tasks are running on other executor threads on this executor.

Any good way to figure out what happened here?

-Akshat


Number of parallel tasks

2015-02-25 Thread Akshat Aranya
I have Spark running in standalone mode with 4 executors, and each executor
with 5 cores each (spark.executor.cores=5).  However, when I'm processing
an RDD with ~90,000 partitions, I only get 4 parallel tasks.  Shouldn't I
be getting 4x5=20 parallel task executions?


Standalone Spark program

2014-12-18 Thread Akshat Aranya
Hi,

I am building a Spark-based service which requires initialization of a
SparkContext in a main():

def main(args: Array[String]) {
val conf = new SparkConf(false)
  .setMaster(spark://foo.example.com:7077)
  .setAppName(foobar)

val sc = new SparkContext(conf)
val rdd = sc.parallelize(0 until 255)
val res =  rdd.mapPartitions(it = it).take(1)
println(sres=$res)
sc.stop()
}

This code works fine via REPL, but not as a standalone program; it causes a
ClassNotFoundException.  This has me confused about how code is shipped out
to executors.  When using via REPL, does the mapPartitions closure, it=it,
get sent out when the REPL statement is executed?  When this code is run as
a standalone program (not via spark-submit), is the compiled code expected
to be present at the the executor?

Thanks,
Akshat


Stateful mapPartitions

2014-12-04 Thread Akshat Aranya
Is it possible to have some state across multiple calls to mapPartitions on
each partition, for instance, if I want to keep a database connection open?


Re: Stateful mapPartitions

2014-12-04 Thread Akshat Aranya
I want to have a database connection per partition of the RDD, and then
reuse that connection whenever mapPartitions is called, which results in
compute being called on the partition.

On Thu, Dec 4, 2014 at 11:07 AM, Paolo Platter paolo.plat...@agilelab.it
wrote:

  Could you provide some further details ?
 What do you nerd to do with db cpnnection?

 Paolo

 Inviata dal mio Windows Phone
  --
 Da: Akshat Aranya aara...@gmail.com
 Inviato: ‎04/‎12/‎2014 18:57
 A: user@spark.apache.org
 Oggetto: Stateful mapPartitions

  Is it possible to have some state across multiple calls to mapPartitions
 on each partition, for instance, if I want to keep a database connection
 open?



Executor failover

2014-11-26 Thread Akshat Aranya
Hi,

I have a question regarding failure of executors: how does Spark reassign
partitions or tasks when executors fail?  Is it necessary that new
executors have the same executor IDs as the ones that were lost, or are
these IDs irrelevant for failover?


Re: advantages of SparkSQL?

2014-11-24 Thread Akshat Aranya
Parquet is a column-oriented format, which means that you need to read in
less data from the file system if you're only interested in a subset of
your columns.  Also, Parquet pushes down selection predicates, which can
eliminate needless deserialization of rows that don't match a selection
criterion.  Other than that, you would also get compression, and likely
save processor cycles when parsing lines from text files.



On Mon, Nov 24, 2014 at 8:20 AM, mrm ma...@skimlinks.com wrote:

 Hi,

 Is there any advantage to storing data as a parquet format, loading it
 using
 the sparkSQL context, but never registering as a table/using sql on it?
 Something like:

 Something like:
 data = sqc.parquetFile(path)
 results =  data.map(lambda x: applyfunc(x.field))

 Is this faster/more optimised than having the data stored as a text file
 and
 using Spark (non-SQL) to process it?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/advantages-of-SparkSQL-tp19661.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark and Play

2014-11-11 Thread Akshat Aranya
Hi,

Sorry if this has been asked before; I didn't find a satisfactory answer
when searching.  How can I integrate a Play application with Spark?  I'm
getting into issues of akka-actor versions.  Play 2.2.x uses akka-actor
2.0, whereas Play 2.3.x uses akka-actor 2.3.4, neither of which work fine
with Spark 1.1.0.  Is there something I should do with libraryDependencies
in my build.sbt to make it work?

Thanks,
Akshat


Mapping SchemaRDD/Row to JSON

2014-11-10 Thread Akshat Aranya
Hi,

Does there exist a way to serialize Row objects to JSON.  In the absence of
such a way, is the right way to go:

* get hold of schema using SchemaRDD.schema
* Iterate through each individual Row as a Seq and use the schema to
convert values in the row to JSON types.

Thanks,
Akshat


Re: Spark as key/value store?

2014-10-22 Thread Akshat Aranya
Spark, in general, is good for iterating through an entire dataset again
and again.  All operations are expressed in terms of iteration through all
the records of at least one partition.  You may want to look at IndexedRDD (
https://issues.apache.org/jira/browse/SPARK-2365) that aims to improve
point queries.  In general though, Spark is unlikely to outperform KV
stores because of the nature of scheduling a job for every operation.

On Wed, Oct 22, 2014 at 7:51 AM, Hajime Takase placeofnomemor...@gmail.com
wrote:

 Hi,
 Is it possible to use Spark as clustered key/value store ( say, like
 redis-cluster or hazelcast)?Will it out perform in write/read or other
 operation?
 My main urge is to use same RDD from several different SparkContext
 without saving to disk or using spark-job server,but I'm curious if someone
 has already tried using Spark like key/value store.

 Thanks,

 Hajime





Primitive arrays in Spark

2014-10-21 Thread Akshat Aranya
This is as much of a Scala question as a Spark question

I have an RDD:

val rdd1: RDD[(Long, Array[Long])]

This RDD has duplicate keys that I can collapse such

val rdd2: RDD[(Long, Array[Long])] = rdd1.reduceByKey((a,b) = a++b)

If I start with an Array of primitive longs in rdd1, will rdd2 also have
Arrays of primitive longs?  I suspect, based on my memory usage, that this
is not the case.

Also, would it be more efficient to do this:

val rdd1: RDD[(Long, ArrayBuffer[Long])]

and then

val rdd2: RDD[(Long, Array[Long])] = rdd1.reduceByKey((a,b) =
a++b).map(_.toArray)


Attaching schema to RDD created from Parquet file

2014-10-17 Thread Akshat Aranya
Hi,

How can I convert an RDD loaded from a Parquet file into its original type:

case class Person(name: String, age: Int)

val rdd: RDD[Person] = ...
rdd.saveAsParquetFile(people)

val rdd2: sqlContext.parquetFile(people)

How can I map rdd2 back into an RDD[Person]?  All of the examples just show
how to use the RDD loaded from Parquet using SQL.


Re: bug with MapPartitions?

2014-10-17 Thread Akshat Aranya
There seems to be some problem with what gets captured in the closure
that's passed into the mapPartitions (myfunc in your case).

I've had a similar problem before:

http://apache-spark-user-list.1001560.n3.nabble.com/TaskNotSerializableException-when-running-through-Spark-shell-td16574.html

Try putting your myFunc in an object:

object Mapper {
  def myFunc = ...
}
val r = sc.parallelize(c).mapPartitions(Mapper.myFunc).collect()

On Fri, Oct 17, 2014 at 7:33 AM, davidkl davidkl...@hotmail.com wrote:

 Hello,

 Maybe there is something I do not get to understand, but I believe this
 code
 should not throw any serialization error when I run this in the spark
 shell.
 Using similar code with map instead of mapPartitions works just fine.

 import java.io.BufferedInputStream
 import java.io.FileInputStream
 import com.testing.DataPacket

 val inStream = new BufferedInputStream(new FileInputStream(inputFile))
 val p = new DataPacket(inStream)
 val c = Array(p)
 val myfunc[T](iter: Iterator[T]) : Iterator[String] = {
   var res = List[String]()
   while (iter.hasNext)
   {
 val cur = iter.next;
 res .::= ()
   }
   res.iterator
 }
 var r = sc.parallelize(c).mapPartitions(myfunc).collect()

 This throws the following:

 org.apache.spark.SparkException: Task not serializable
 at

 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 ...
 ...
 Caused by: java.io.NotSerializableException: java.io.BufferedInputStream
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 ...
 ...

 Why is this code failing? The constructor of DataPacket just reads data,
 but
 does not keep any reference to the BufferedInputStream. Note that this is
 not the real code, but a simplification while trying to isolate the cause
 of
 the error I get. Using map on this instead of MapPartitions works fine.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/bug-with-MapPartitions-tp16689.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Larger heap leads to perf degradation due to GC

2014-10-16 Thread Akshat Aranya
I just want to pitch in and say that I ran into the same problem with
running with 64GB executors.  For example, some of the tasks take 5 minutes
to execute, out of which 4 minutes are spent in GC.  I'll try out smaller
executors.

On Mon, Oct 6, 2014 at 6:35 PM, Otis Gospodnetic otis.gospodne...@gmail.com
 wrote:

 Hi,

 The other option to consider is using G1 GC, which should behave better
 with large heaps.  But pointers are not compressed in heaps  32 GB in
 size, so you may be better off staying under 32 GB.

 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/


 On Mon, Oct 6, 2014 at 8:08 PM, Mingyu Kim m...@palantir.com wrote:

 Ok, cool. This seems to be general issues in JVM with very large heaps. I
 agree that the best workaround would be to keep the heap size below 32GB.
 Thanks guys!

 Mingyu

 From: Arun Ahuja aahuj...@gmail.com
 Date: Monday, October 6, 2014 at 7:50 AM
 To: Andrew Ash and...@andrewash.com
 Cc: Mingyu Kim m...@palantir.com, user@spark.apache.org 
 user@spark.apache.org, Dennis Lawler dlaw...@palantir.com
 Subject: Re: Larger heap leads to perf degradation due to GC

 We have used the strategy that you suggested, Andrew - using many workers
 per machine and keeping the heaps small ( 20gb).

 Using a large heap resulted in workers hanging or not responding (leading
 to timeouts).  The same dataset/job for us will fail (most often due to
 akka disassociated or fetch failures errors) with 10 cores / 100 executors,
 60 gb per executor while succceed with 1 core / 1000 executors / 6gb per
 executor.

 When the job does succceed with more cores per executor and larger heap
 it is usually much slower than the smaller executors (the same 8-10 min job
 taking 15-20 min to complete)

 The unfortunate downside of this has been, we have had some large
 broadcast variables which may not fit into memory (and unnecessarily
 duplicated) when using the smaller executors.

 Most of this is anecdotal but for the most part we have had more success
 and consistency with more executors with smaller memory requirements.

 On Sun, Oct 5, 2014 at 7:20 PM, Andrew Ash and...@andrewash.com wrote:

 Hi Mingyu,

 Maybe we should be limiting our heaps to 32GB max and running multiple
 workers per machine to avoid large GC issues.

 For a 128GB memory, 32 core machine, this could look like:

 SPARK_WORKER_INSTANCES=4
 SPARK_WORKER_MEMORY=32
 SPARK_WORKER_CORES=8

 Are people running with large (32GB+) executor heaps in production?  I'd
 be curious to hear if so.

 Cheers!
 Andrew

 On Thu, Oct 2, 2014 at 1:30 PM, Mingyu Kim m...@palantir.com wrote:

 This issue definitely needs more investigation, but I just wanted to
 quickly check if anyone has run into this problem or has general guidance
 around it. We’ve seen a performance degradation with a large heap on a
 simple map task (I.e. No shuffle). We’ve seen the slowness starting around
 from 50GB heap. (I.e. spark.executor.memoty=50g) And, when we checked the
 CPU usage, there were just a lot of GCs going on.

 Has anyone seen a similar problem?

 Thanks,
 Mingyu







TaskNotSerializableException when running through Spark shell

2014-10-16 Thread Akshat Aranya
Hi,

Can anyone explain how things get captured in a closure when runing through
the REPL.  For example:

def foo(..) = { .. }

rdd.map(foo)

sometimes complains about classes not being serializable that are
completely unrelated to foo.  This happens even when I write it such:

object Foo {
  def foo(..) = { .. }
}

rdd.map(Foo.foo)

It also doesn't happen all the time.


One pass compute() to produce multiple RDDs

2014-10-09 Thread Akshat Aranya
Hi,

Is there a good way to materialize derivate RDDs from say, a HadoopRDD
while reading in the data only once.  One way to do so would be to cache
the HadoopRDD and then create derivative RDDs, but that would require
enough RAM to cache the HadoopRDD which is not an option in my case.

Thanks,
Akshat


Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?

2014-10-08 Thread Akshat Aranya
Using a var for RDDs in this way is not going to work.  In this example,
tx1.zip(tx2) would create and RDD that depends on tx2, but then soon after
that, you change what tx2 means, so you would end up having a circular
dependency.

On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 My job is not being fault-tolerant (e.g., when there's a fetch failure or
 something).

 The lineage of RDDs are constantly updated every iteration. However, I
 think that when there's a failure, the lineage information is not being
 correctly reapplied.

 It goes something like this:

 val rawRDD = read(...)
 val repartRDD = rawRDD.repartition(X)

 val tx1 = repartRDD.map(...)
 var tx2 = tx1.map(...)

 while (...) {
   tx2 = tx1.zip(tx2).map(...)
 }


 Is there any way to monitor RDD's lineage, maybe even including? I want to
 make sure that there's no unexpected things happening.



Relation between worker memory and executor memory in standalone mode

2014-10-01 Thread Akshat Aranya
Hi,

What's the relationship between Spark worker and executor memory settings
in standalone mode?  Do they work independently or does the worker cap
executor memory?

Also, is the number of concurrent executors per worker capped by the number
of CPU cores configured for the worker?


Re: Relation between worker memory and executor memory in standalone mode

2014-10-01 Thread Akshat Aranya
On Wed, Oct 1, 2014 at 11:33 AM, Akshat Aranya aara...@gmail.com wrote:



 On Wed, Oct 1, 2014 at 11:00 AM, Boromir Widas vcsub...@gmail.com wrote:

 1. worker memory caps executor.
 2. With default config, every job gets one executor per worker. This
 executor runs with all cores available to the worker.

 By the job do you mean one SparkContext or one stage execution within a
 program?  Does that also mean that two concurrent jobs will get one
 executor each at the same time?


Experimenting with this some more, I figured out that an executor takes
away spark.executor.memory amount of memory from the configured worker
memory.  It also takes up all the cores, so even if there is still some
memory left, there are no cores left for starting another executor.  Is my
assessment correct? Is there no way to configure the number of cores that
an executor can use?




 On Wed, Oct 1, 2014 at 11:04 AM, Akshat Aranya aara...@gmail.com wrote:

 Hi,

 What's the relationship between Spark worker and executor memory
 settings in standalone mode?  Do they work independently or does the worker
 cap executor memory?

 Also, is the number of concurrent executors per worker capped by the
 number of CPU cores configured for the worker?






Determining number of executors within RDD

2014-10-01 Thread Akshat Aranya
Hi,

I want implement an RDD wherein the decision of number of partitions is
based on the number of executors that have been set up. Is there some way I
can determine the number of executors within the getPartitions() call?


Re: partitioned groupBy

2014-09-17 Thread Akshat Aranya
Patrick,

If I understand this correctly, I won't be able to do this in the closure
provided to mapPartitions() because that's going to be stateless, in the
sense that a hash map that I create within the closure would only be useful
for one call of MapPartitionsRDD.compute().  I guess I would need to
override mapPartitions() directly within my RDD.  Right?

On Tue, Sep 16, 2014 at 4:57 PM, Patrick Wendell pwend...@gmail.com wrote:

 If each partition can fit in memory, you can do this using
 mapPartitions and then building an inverse mapping within each
 partition. You'd need to construct a hash map within each partition
 yourself.

 On Tue, Sep 16, 2014 at 4:27 PM, Akshat Aranya aara...@gmail.com wrote:
  I have a use case where my RDD is set up such:
 
  Partition 0:
  K1 - [V1, V2]
  K2 - [V2]
 
  Partition 1:
  K3 - [V1]
  K4 - [V3]
 
  I want to invert this RDD, but only within a partition, so that the
  operation does not require a shuffle.  It doesn't matter if the
 partitions
  of the inverted RDD have non unique keys across the partitions, for
 example:
 
  Partition 0:
  V1 - [K1]
  V2 - [K1, K2]
 
  Partition 1:
  V1 - [K3]
  V3 - [K4]
 
  Is there a way to do only a per-partition groupBy, instead of shuffling
 the
  entire data?
 



Indexed RDD

2014-09-16 Thread Akshat Aranya
Hi,

I'm trying to implement a custom RDD that essentially works as a
distributed hash table, i.e. the key space is split up into partitions and
within a partition, an element can be looked up efficiently by the key.
However, the RDD lookup() function (in PairRDDFunctions) is implemented in
a way iterate through all elements of a partition and find the matching
ones.  Is there a better way to do what I want to do, short of just
implementing new methods on the custom RDD?

Thanks,
Akshat


partitioned groupBy

2014-09-16 Thread Akshat Aranya
I have a use case where my RDD is set up such:

Partition 0:
K1 - [V1, V2]
K2 - [V2]

Partition 1:
K3 - [V1]
K4 - [V3]

I want to invert this RDD, but only within a partition, so that the
operation does not require a shuffle.  It doesn't matter if the partitions
of the inverted RDD have non unique keys across the partitions, for example:

Partition 0:
V1 - [K1]
V2 - [K1, K2]

Partition 1:
V1 - [K3]
V3 - [K4]

Is there a way to do only a per-partition groupBy, instead of shuffling the
entire data?