Print in JavaNetworkWordCount
Hi guys, Somebody help me, Where do I get change the print() function to print more than 10 lines in screen? Is there a manner to print the count total of all words in a batch? Best Regards -- --- INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI I dati utilizzati per l'invio del presente messaggio sono trattati dall'Università degli Studi di Brescia esclusivamente per finalità istituzionali. Informazioni più dettagliate anche in ordine ai diritti dell'interessato sono riposte nell'informativa generale e nelle notizie pubblicate sul sito web dell'Ateneo nella sezione Privacy. Il contenuto di questo messaggio è rivolto unicamente alle persona cui è indirizzato e può contenere informazioni la cui riservatezza è tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso in mancanza di autorizzazione del destinatario. Qualora il messaggio fosse pervenuto per errore, preghiamo di eliminarlo.
TorrentBroadcast + persist = bug
Hello, I think there is a bug with TorrentBroadcast in the latest release (0.8.1). The problem is that even a simple job (e.g., rdd.count) hangs waiting for some tasks to finish. Here is how to reproduce the problem: 1) Configure Spark such that node X is the master and also one of the workers (e.g., 5 nodes = 5 workers and 1 master) 2) Activate TorrentBroadcast 3) Use Kryo serializer (the problem happens more often than with Java serializer) 4) Read some file from HDFS, persist RDD, and call count In almost 80% of the cases (~50% with Java serializer), the count job hangs waiting for two tasks from node X to finish. The problem *does not* appear if: 1) I separate the master from the worker nodes, or 2) I use HttpBroadcast, or 3) I do not persist the RDD. The code is below. def main(args: Array[String]): Unit = { System.setProperty(spark.serializer, org.apache.spark.serializer.KryoSerializer) System.setProperty(spark.kryo.registrator, test.MyRegistrator) System.setProperty(spark.broadcast.factory, org.apache.spark.broadcast.TorrentBroadcastFactory) val sc = new SparkContext(...) val file = hdfs://server:9000/user/xxx/Test.out // ~750MB val rdd = sc.textFile(file) rdd.persist println(Counting: + rdd.count) } Best regards, Milos
ExternalAppendOnlyMap throw no such element
Hi, I'm tring out lastest master branch of spark for the exciting external hashmap feature. I have a code that is running correctly at spark 0.8.1 and I only make a change for its easily to be spilled to disk. However, I encounter a few task failure of java.util.NoSuchElementException (java.util.NoSuchElementException) org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277) org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212) org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) And the job seems to fail to recover. Can anyone give some suggestion on how to investigate the issue? Thanks,Jiacheng Guo
Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: scala.None$ error when mysql-async is add in build.sbt
My application is failing with an Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: scala.None$ error when the mysql-async library ( https://github.com/mauricio/postgresql-async) is added to build.sbt. I've add the following line to build.sbt com.github.mauricio %% mysql-async % 0.2.11 When this line is commented out the application runs just fine. Could you please help? I'm a newbie with Scala and Spark but would like to create an async connection to mysql to import my datadefinitions (i.e. which datasets there are, where to find them in HDFS, etc.) in order to create dynamic RDD's based on definitions in mysql. I'm getting the following error message: 23:43:54.429 [spark-akka.actor.default-dispatcher-3] INFO o.a.s.s.local.LocalTaskSetManager - Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: scala.None$ at java.net.URLClassLoader$1.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Unknown Source) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:36) at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.defaultReadFields(Unknown Source) at java.io.ObjectInputStream.readSerialData(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:129) at java.io.ObjectInputStream.readExternalData(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61) at org.apache.spark.scheduler.local.LocalScheduler.runTask(LocalScheduler.scala:191) at org.apache.spark.scheduler.local.LocalActor$$anonfun$launchTask$1$$anon$1.run(LocalScheduler.scala:68) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 23:43:54.438 [DAGScheduler] DEBUG o.a.spark.scheduler.DAGScheduler - Got event of type org.apache.spark.scheduler.TaskSetFailed 23:43:54.443 [test-akka.actor.default-dispatcher-3] INFO o.a.spark.scheduler.DAGScheduler - Failed to run count at DataSession.scala:26 23:43:54.447 [spark-akka.actor.default-dispatcher-3] INFO o.a.s.scheduler.local.LocalScheduler - Remove TaskSet 0.0 from pool [ERROR] [01/19/2014 23:43:54.455] [test-akka.actor.default-dispatcher-6] [akka://test/user/testServer/1/771192171] Job failed: Task 0.0:0 failed more than 4 times; aborting job java.lang.ClassNotFoundException: scala.None$ org.apache.spark.SparkException: Job failed: Task 0.0:0 failed more than 4 times; aborting job java.lang.ClassNotFoundException: scala.None$ at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:759) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:759) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:380) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:442) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:150)
Re: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: scala.None$ error when mysql-async is add in build.sbt
Solved, the mysql-async required scala 2.10.3 and I was compiling was version 2.10.2 On Mon, Jan 20, 2014 at 1:29 PM, Richard Siebeling rsiebel...@gmail.comwrote: My application is failing with an Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: scala.None$ error when the mysql-async library ( https://github.com/mauricio/postgresql-async) is added to build.sbt. I've add the following line to build.sbt com.github.mauricio %% mysql-async % 0.2.11 When this line is commented out the application runs just fine. Could you please help? I'm a newbie with Scala and Spark but would like to create an async connection to mysql to import my datadefinitions (i.e. which datasets there are, where to find them in HDFS, etc.) in order to create dynamic RDD's based on definitions in mysql. I'm getting the following error message: 23:43:54.429 [spark-akka.actor.default-dispatcher-3] INFO o.a.s.s.local.LocalTaskSetManager - Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: scala.None$ at java.net.URLClassLoader$1.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Unknown Source) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:36) at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.defaultReadFields(Unknown Source) at java.io.ObjectInputStream.readSerialData(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:129) at java.io.ObjectInputStream.readExternalData(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61) at org.apache.spark.scheduler.local.LocalScheduler.runTask(LocalScheduler.scala:191) at org.apache.spark.scheduler.local.LocalActor$$anonfun$launchTask$1$$anon$1.run(LocalScheduler.scala:68) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 23:43:54.438 [DAGScheduler] DEBUG o.a.spark.scheduler.DAGScheduler - Got event of type org.apache.spark.scheduler.TaskSetFailed 23:43:54.443 [test-akka.actor.default-dispatcher-3] INFO o.a.spark.scheduler.DAGScheduler - Failed to run count at DataSession.scala:26 23:43:54.447 [spark-akka.actor.default-dispatcher-3] INFO o.a.s.scheduler.local.LocalScheduler - Remove TaskSet 0.0 from pool [ERROR] [01/19/2014 23:43:54.455] [test-akka.actor.default-dispatcher-6] [akka://test/user/testServer/1/771192171] Job failed: Task 0.0:0 failed more than 4 times; aborting job java.lang.ClassNotFoundException: scala.None$ org.apache.spark.SparkException: Job failed: Task 0.0:0 failed more than 4 times; aborting job java.lang.ClassNotFoundException: scala.None$ at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:759) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:759) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:380) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:442) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:150)
Re: cannot run sbt/sbt assembly
Use scala 2.9.2. From what I read 2.9.3 is not supported. You might want to try a later version of the JDK 7.0_51 On Friday, January 17, 2014 1:07 PM, Kal El pinu.datri...@yahoo.com wrote: Hello, I have tried to assemble spark (sbt/sbt assembly) with different versions of java (open JDK, sun hotspot) on an ARM v7 Cortex A15 architecture Samsung Exynos SoC and I got the following error: A fatal error has been detected by the Java Runtime Environment: # # Internal Error (os_linux_zero.cpp:285), pid=3039, tid=50648176 # fatal error: caught unhandled signal 11 # # JRE version: 7.0_21-b02 I have attached the log from one of the attempts. Can anyone figure out why this is happening ? P.S: I am using Scala 2.9.3 and I have used Spark on a x86 machine before, so this is not the first time I am setting up Spark. Thanks, Alex
Re: Print in JavaNetworkWordCount
Hi Eduardo, You can do arbitrary stuff with the data in a DStream using the operation foreachRDD. yourDStream.foreachRDD(rdd = { // Get and print first n elements val firstN = rdd.take(n) println(First N elements = + firstN) // Count the number of elements in each batch println(RDD has + rdd.count() + elements) }) Alternatively, just for printing the counts, you can also do yourDStream.count.print() Hope this helps! TD 2014/1/20 Eduardo Costa Alfaia e.costaalf...@studenti.unibs.it Hi guys, Somebody help me, Where do I get change the print() function to print more than 10 lines in screen? Is there a manner to print the count total of all words in a batch? Best Regards -- --- INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI I dati utilizzati per l'invio del presente messaggio sono trattati dall'Università degli Studi di Brescia esclusivamente per finalità istituzionali. Informazioni più dettagliate anche in ordine ai diritti dell'interessato sono riposte nell'informativa generale e nelle notizie pubblicate sul sito web dell'Ateneo nella sezione Privacy. Il contenuto di questo messaggio è rivolto unicamente alle persona cui è indirizzato e può contenere informazioni la cui riservatezza è tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso in mancanza di autorizzazione del destinatario. Qualora il messaggio fosse pervenuto per errore, preghiamo di eliminarlo.
Re: Spark writing to disk when there's enough memory?!
Hi, I've experimented with the parameters provided but we are still seeing the same problem, data is still spilling to disk when there's clearly enough memory on the worker nodes. Please note that data is distributed equally amongst the 6 Hadoop nodes (About 5GB per node). Any workarounds or clues as to why this is still happening please? Thanks, Majd -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-writing-to-disk-when-there-s-enough-memory-tp502p678.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Master on Hadoop Job Tracker?
Hi, Should the Spark Master run on the Hadoop Job Tracker node (and Spark workers on Task Trackers) or the placement of the Spark Master could reside on any Hadoop node? Thanks Majd -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Master-on-Hadoop-Job-Tracker-tp680.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: SparkException: Expect only DirectTaskResults when using localScheduler()
Thank u Patrick. -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Friday, January 17, 2014 11:54 PM To: user@spark.incubator.apache.org Subject: Re: SparkException: Expect only DirectTaskResults when using localScheduler() This is a bug that was fixed and will be part of 0.8.2: https://github.com/apache/incubator-spark/pull/281 A workaround is setting the akka frame size to be larger using spark.akka.frameSize. The issue is that we added a mechanism to circumvent akka for large task results, but the implementation assumed this would never happen in local mode. That assumption turned out to be wrong, so it was patched in #281. - Patrick On Fri, Jan 17, 2014 at 9:53 PM, Nick Pentreath nick.pentre...@gmail.com wrote: I'm also running into this issue. If it is related to a setting then the error message should surely be cleaned up a bit to be more helpful in suggesting a fix? - Sent from Mailbox for iPhone On Sat, Jan 18, 2014 at 2:35 AM, hussam_jar...@dell.com hussam_jar...@dell.com wrote: Hi, Can someone please explain what it means the following exception 'SparkException: Expect only DirectTaskResults when using localScheduler()'? This is related to spark.akka.frameSize? I am running single node local spark 0.8.1 Thanks, Hussam
Re: Quality of documentation (rant)
Hi Ognen, It’s true that the documentation is partly targeting Hadoop users, and that’s something we need to fix. Perhaps the best solution would be some kind of tutorial on “here’s how to set up Spark by hand on EC2”. However it also sounds like you ran into some issues with S3 that it would be good to report separately. To answer the specific questions: For example, the thing supports using S3 to get files but when you actually try to read a large file, it just sits there and sits there and eventually comes back with an error that really does not tell me anything (so the task was killed - why? there is nothing in the logs). So, do I actually need an HDFS setup over S3 so it can support block access? Who knows, I can't find anything. This sounds like either a bug or somehow the S3 library requiring lots of memory to read a block. There isn’t a separate way to run HDFS over S3. Hadoop just has different implementations of “file systems”, one of which is S3. There’s a pointer to these versions at the bottom of http://spark.incubator.apache.org/docs/latest/ec2-scripts.html#accessing-data-in-s3 but it is indeed pretty hidden in the docs. Even basic questions I have to ask on this list - does Spark support parallel reads from files in a shared filesystem? Someone answered - yes. Does this extend to S3? Who knows? Nowhere to be found. Does it extend to S3 only if used through HDFS? Who knows. Everything in Hadoop and Spark is read in parallel, including S3. Does Spark need a running Hadoop cluster to realize its full potential? Who knows, it is not stated explicitly anywhere but any time I google stuff people mention Hadoop. Not unless you want to use HDFS. Can Spark do EVERYTHING in standalone mode? The documentation is not explicit but it leads you to believe it can (or maybe I am overly optimistic?). Yes, there’s no difference on what you can run on Spark in the different deployment modes. They’re just different ways to get tasks on a cluster. Anyway, these are really good questions as I said, since the docs kind of target a Hadoop audience. We can improve these both in the online docs and by having some kind of walk-throughs or tutorial. Do you have any suggestions on how you’d like the docs structured to show this stuff? E.g. should there be a separate section on S3, or different input sources? One final thing — as someone mentioned, using Spark’s EC2 scripts to launch a cluster is not a bad idea. We’ve supported those scripts pretty much since Spark was released and they do a lot of the configuration for you. You can even pause/restart the cluster if you want, etc. Matei
SPARK protocol buffer issue. Need Help
*Hi,* *I'm new to spark. And I was trying to read a file residing in HDFS. And perform some basic actions on this dataset. See below the code i used:* *object Hbase {* * def main(args: Array[String]) {* *val sc = new SparkContext(spark://servername:portno,somename)* * val input = sc.textFile(hdfs://servername/user/cool/inputWrite.txt)* * input.count()* * }* *}* *Also see below the the .sbt file content. * *name := Simple Project* *version := 1.0* *scalaVersion := 2.9.3* *libraryDependencies ++=Seq(org.apache.spark %% spark-core % 0.8.0-incubating,org.apache.hadoop % hadoop-client % 2.0.4-alpha,com.google.protobuf % protobuf-java % 2.4.1 force())* *resolvers += Akka Repository at http://repo.akka.io/releases/ http://repo.akka.io/releases/* *When i do sbt run, I'm seeing below error. Can someone help me resolve this issue? * *java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status; Host Details : local host is: ; destination host is: ;* *java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status; Host Details : local host is: ; destination host is: ;* *at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:761)* *at org.apache.hadoop.ipc.Client.call(Client.java:1239)* *at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)* *at $Proxy12.getFileInfo(Unknown Source)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* *at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)* *at java.lang.reflect.Method.invoke(Method.java:597)* *at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)* *at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)* *at $Proxy12.getFileInfo(Unknown Source)* *at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:630)* *at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1559)* *at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:811)* *at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1649)* *at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1595)* *at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:207)* *at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:251)* *at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:70)* *at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)* *at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:26)* *at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)* *at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)* *at org.apache.spark.rdd.RDD.count(RDD.scala:677)* *at Hbase$.main(Hbase.scala:7)* *at Hbase.main(Hbase.scala)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* *at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)* *at java.lang.reflect.Method.invoke(Method.java:597)* *Caused by: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status* *at com.google.protobuf.UninitializedMessageException.asInvalidProtocolBufferException(UninitializedMessageException.java:81)* *at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto$Builder.buildParsed(RpcPayloadHeaderProtos.java:1094)* *at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto$Builder.access$1300(RpcPayloadHeaderProtos.java:1028)* *at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcPayloadHeaderProtos.java:986)* *at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:946)* *at org.apache.hadoop.ipc.Client$Connection.run(Client.java:844)* *[trace] Stack trace suppressed: run last compile:run for the full output.* *java.lang.RuntimeException: Nonzero exit code: 1* *at scala.sys.package$.error(package.scala:27)* *[trace] Stack trace suppressed: run last compile:run for the full output.* *[error] (compile:run) Nonzero exit code: 1* *[error] Total time: 3 s, completed Jan 20, 2014 12:57:32 PM* Regards, SB
Re: Quality of documentation (rant)
Hi Matei, thanks for replying! On Mon, Jan 20, 2014 at 8:08 PM, Matei Zaharia matei.zaha...@gmail.comwrote: It’s true that the documentation is partly targeting Hadoop users, and that’s something we need to fix. Perhaps the best solution would be some kind of tutorial on “here’s how to set up Spark by hand on EC2”. However it also sounds like you ran into some issues with S3 that it would be good to report separately. To answer the specific questions: For example, the thing supports using S3 to get files but when you actually try to read a large file, it just sits there and sits there and eventually comes back with an error that really does not tell me anything (so the task was killed - why? there is nothing in the logs). So, do I actually need an HDFS setup over S3 so it can support block access? Who knows, I can't find anything. This sounds like either a bug or somehow the S3 library requiring lots of memory to read a block. There isn’t a separate way to run HDFS over S3. Hadoop just has different implementations of “file systems”, one of which is S3. There’s a pointer to these versions at the bottom of http://spark.incubator.apache.org/docs/latest/ec2-scripts.html#accessing-data-in-s3but it is indeed pretty hidden in the docs. Hmmm. Maybe a bug then. If I read a small 600 byte file via the s3n:// uri - it works on a spark cluster. If I try a 20GB file it just sits and sits and sits frozen. Is there anything I can do to instrument this and figure out what is going on? Even basic questions I have to ask on this list - does Spark support parallel reads from files in a shared filesystem? Someone answered - yes. Does this extend to S3? Who knows? Nowhere to be found. Does it extend to S3 only if used through HDFS? Who knows. Everything in Hadoop and Spark is read in parallel, including S3. OK good to know! Does Spark need a running Hadoop cluster to realize its full potential? Who knows, it is not stated explicitly anywhere but any time I google stuff people mention Hadoop. Not unless you want to use HDFS. Ahh, OK. I don't particularly want HDFS but I suspect I will need it since it seems to be the only free distributed parallel FS. I suspect running it over EBS volumes is probably as slow as molasses though. Right now the s3:// freezing bug is a show stopper for me and I am considering putting the ephemeral storage on all the nodes in the spark cluster in some kind of a distributed file system like GPFS or Lustre or https://code.google.com/p/mogilefs/ to provide a shared file system for all the nodes. It is next to impossible to find online what the standard practices in the industry are for this kind of a setup so I guess I am going to set my own industry standards ;) Anyway, these are really good questions as I said, since the docs kind of target a Hadoop audience. We can improve these both in the online docs and by having some kind of walk-throughs or tutorial. Do you have any suggestions on how you’d like the docs structured to show this stuff? E.g. should there be a separate section on S3, or different input sources? Not sure. For starters it would be nice to document the real use cases. I am more than happy (and I think the people I work for are happy too) to document the pipeline I am setting up. In the process I have found that the industry is remarkably tight lipped as to how to do these things in practice. For example, what if you want to expose a point on the internet where you can send millions of data points into your firehose? What do you use? How? I have people people recommending kafka but even those people don't exactly say HOW. I have gone the route of elactic load balancing with autoscaling exposing a bunch of mongrel2 instances running zeromq handlers that ingest data and then bounce it into S3 for persistence and into a Spark cluster for real-time analytics but also for post fact analytics. While I have demonstrated the whole pipeline on a toy example, I am now trying to test it in real life with historic data that we have from our previous data provider - about 1-2 TB of data so far in 20-30GB files. Unfortunately I have not been able to get past the f = textFile(s3://something), f.count basic test on a 20GB file on Amazon S3. I have a test cluster of about 16 m1.xlarge instances that is just sitting there spinning :) One final thing — as someone mentioned, using Spark’s EC2 scripts to launch a cluster is not a bad idea. We’ve supported those scripts pretty much since Spark was released and they do a lot of the configuration for you. You can even pause/restart the cluster if you want, etc. Yes, but things get complicated in people's setups. I run mine in a VPC that exposes only one point of entry - the elastic load balancer that takes the traffic from the outside and sends it to the inside of the VPC where the analytics/spark live. I imagine this would be a common use scenario for a company that has millions of
Re: SPARK protocol buffer issue. Need Help
Every time I see the magic words... InvalidProtocolBufferException: Message missing required fields: callId, status; ... it indicates that a client of something is using protobuf 2.4 and the server is using protobuf 2.5. Here you are using protobuf 2.4, check. And I suppose you are using HDFS from a Hadoop 2.2.x distribution? that uses protobuf 2.5. While I suspect that is the cause, others here might actually have a solution. Can you force protobuf 2.5 instead of 2.4? I am aware of a different build profile for YARN which might help too. -- Sean Owen | Director, Data Science | London On Mon, Jan 20, 2014 at 9:05 PM, suman bharadwaj suman@gmail.com wrote: Hi, I'm new to spark. And I was trying to read a file residing in HDFS. And perform some basic actions on this dataset. See below the code i used: object Hbase { def main(args: Array[String]) { val sc = new SparkContext(spark://servername:portno,somename) val input = sc.textFile(hdfs://servername/user/cool/inputWrite.txt) input.count() } } Also see below the the .sbt file content. name := Simple Project version := 1.0 scalaVersion := 2.9.3 libraryDependencies ++=Seq(org.apache.spark %% spark-core % 0.8.0-incubating,org.apache.hadoop % hadoop-client % 2.0.4-alpha,com.google.protobuf % protobuf-java % 2.4.1 force()) resolvers += Akka Repository at http://repo.akka.io/releases/; When i do sbt run, I'm seeing below error. Can someone help me resolve this issue? java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status; Host Details : local host is: ; destination host is: ; java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status; Host Details : local host is: ; destination host is: ; at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:761) at org.apache.hadoop.ipc.Client.call(Client.java:1239) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202) at $Proxy12.getFileInfo(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83) at $Proxy12.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:630) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1559) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:811) at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1649) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1595) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:207) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:251) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:70) at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:26) at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) at org.apache.spark.SparkContext.runJob(SparkContext.scala:772) at org.apache.spark.rdd.RDD.count(RDD.scala:677) at Hbase$.main(Hbase.scala:7) at Hbase.main(Hbase.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) Caused by: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status at com.google.protobuf.UninitializedMessageException.asInvalidProtocolBufferException(UninitializedMessageException.java:81) at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto$Builder.buildParsed(RpcPayloadHeaderProtos.java:1094) at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto$Builder.access$1300(RpcPayloadHeaderProtos.java:1028) at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcPayloadHeaderProtos.java:986) at
Re: Lzo + Protobuf
Any suggestions, anyone? Core team / contributors / spark-developers - any thoughts? On Jan 17, 2014, at 4:45 PM, Vipul Pandey vipan...@gmail.com wrote: Hi All, Can someone please share (sample) code to read lzo compressed protobufs from hdfs (using elephant bird)? I'm trying whatever I see in the forum and on the web but it doesn't seem comprehensive to me. I'm using Spark0.8.0 . My pig scripts are able to read protobuf just fine so the hadoop layer is setup alright. It will be really helpful if someone can list out what needs to be done with/in spark. ~Vipul
Gathering exception stack trace
Hi all, I¹m having hard time trying to find out ways to report exception that happens during computation to the end-user of Spark system without having them ssh into the worker nodes or accessing Spark UI. For example, if some exception happens in the code that runs on worker nodes (e.g. IllegalStateException due to wrong user input), SparkContext only shows the following vague exception, and I¹d have to dig into the worker node to get the actual exception. Exception saving /tmp/data/dTableIL.dtconfig: org.apache.spark.SparkException: Job failed: Task 343.0:8 failed more than 4 times org.apache.spark.SparkException: Job failed: Task 343.0:8 failed more than 4 times at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSchedul er.scala:760) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSchedul er.scala:758) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedule r$$run(DAGScheduler.scala:441) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) Is there any way to forward the exception to SparkContext? If not, what are some work-arounds that can mitigate the problem here? Thanks in advance! Mingyu smime.p7s Description: S/MIME cryptographic signature
Re: SPARK protocol buffer issue. Need Help
Hi Sean, Thanks. You are right. The SPARK_HOME , lib_managed folder has a different protocol buffer version jar than in /usr/lib/hadoop/lib. In hadoop lib, I have 2.4.0a version and in lib_managed i have 2.4.1 version which is, as you said, is conflicting. I'm really new to SPARK and SCALA as well. I did the following. libraryDependencies ++=Seq(org.apache.spark %% spark-core % 0.8.0-incubating,org.apache.hadoop % hadoop-client % 2.0.4-alpha,com.google.protobuf % protobuf-java % 2.4.0a force()) But this doesn't seem to be working. I get the same error. I really don't know how to force SPARK to use 2.4.0a ? Any ideas ? Regards, SB On 21 January 2014 03:15, Sean Owen so...@cloudera.com wrote: Every time I see the magic words... InvalidProtocolBufferException: Message missing required fields: callId, status; ... it indicates that a client of something is using protobuf 2.4 and the server is using protobuf 2.5. Here you are using protobuf 2.4, check. And I suppose you are using HDFS from a Hadoop 2.2.x distribution? that uses protobuf 2.5. While I suspect that is the cause, others here might actually have a solution. Can you force protobuf 2.5 instead of 2.4? I am aware of a different build profile for YARN which might help too. -- Sean Owen | Director, Data Science | London On Mon, Jan 20, 2014 at 9:05 PM, suman bharadwaj suman@gmail.com wrote: Hi, I'm new to spark. And I was trying to read a file residing in HDFS. And perform some basic actions on this dataset. See below the code i used: object Hbase { def main(args: Array[String]) { val sc = new SparkContext(spark://servername:portno,somename) val input = sc.textFile(hdfs://servername/user/cool/inputWrite.txt) input.count() } } Also see below the the .sbt file content. name := Simple Project version := 1.0 scalaVersion := 2.9.3 libraryDependencies ++=Seq(org.apache.spark %% spark-core % 0.8.0-incubating,org.apache.hadoop % hadoop-client % 2.0.4-alpha,com.google.protobuf % protobuf-java % 2.4.1 force()) resolvers += Akka Repository at http://repo.akka.io/releases/; When i do sbt run, I'm seeing below error. Can someone help me resolve this issue? java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status; Host Details : local host is: ; destination host is: ; java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status; Host Details : local host is: ; destination host is: ; at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:761) at org.apache.hadoop.ipc.Client.call(Client.java:1239) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202) at $Proxy12.getFileInfo(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83) at $Proxy12.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:630) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1559) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:811) at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1649) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1595) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:207) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:251) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:70) at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:26) at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) at org.apache.spark.SparkContext.runJob(SparkContext.scala:772) at org.apache.spark.rdd.RDD.count(RDD.scala:677) at Hbase$.main(Hbase.scala:7) at Hbase.main(Hbase.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at
Re: Quality of documentation (rant)
This sounds like either a bug or somehow the S3 library requiring lots of memory to read a block. There isn’t a separate way to run HDFS over S3. Hadoop just has different implementations of “file systems”, one of which is S3. There’s a pointer to these versions at the bottom of http://spark.incubator.apache.org/docs/latest/ec2-scripts.html#accessing-data-in-s3 but it is indeed pretty hidden in the docs. Hmmm. Maybe a bug then. If I read a small 600 byte file via the s3n:// uri - it works on a spark cluster. If I try a 20GB file it just sits and sits and sits frozen. Is there anything I can do to instrument this and figure out what is going on? Try taking a look at the stderr log of the executor that failed. You should hopefully see a more detailed error message there. The stderr logs can be found by browsing to http://mymaster:8080, where `mymaster` is the hostname of your Spark master. Hope that helps, -Jey
Re: Quality of documentation (rant)
Jey, On Mon, Jan 20, 2014 at 10:59 PM, Jey Kottalam j...@cs.berkeley.edu wrote: This sounds like either a bug or somehow the S3 library requiring lots of memory to read a block. There isn’t a separate way to run HDFS over S3. Hadoop just has different implementations of “file systems”, one of which is S3. There’s a pointer to these versions at the bottom of http://spark.incubator.apache.org/docs/latest/ec2-scripts.html#accessing-data-in-s3 but it is indeed pretty hidden in the docs. Hmmm. Maybe a bug then. If I read a small 600 byte file via the s3n:// uri - it works on a spark cluster. If I try a 20GB file it just sits and sits and sits frozen. Is there anything I can do to instrument this and figure out what is going on? Try taking a look at the stderr log of the executor that failed. You should hopefully see a more detailed error message there. The stderr logs can be found by browsing to http://mymaster:8080, where `mymaster` is the hostname of your Spark master. Thanks. I will try that but your assumption is that something is failing in an obvious way with a message. By the look of the spark-shell - just frozen I would say something is stuck. Will report back. Thanks, Ognen
spark-shell on standalone cluster gives error no mesos in java.library.path
Hi I deployed spark 0.8.1 on standalone cluster per https://spark.incubator.apache.org/docs/0.8.1/spark-standalone.html When i start a spark-shell , I get following error I thought mesos should not be required for standalone cluster. Do I have to change any parameters in make-distribution.sh that I used to build the spark distribution for this cluster ? I left all to default (and noticed that the default HADOOP version is 1.0.4 which is not my hadoop version - but I am not using Hadoop here). Creating SparkContext... Failed to load native Mesos library from java.lang.UnsatisfiedLinkError: no mesos in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1738) at java.lang.Runtime.loadLibrary0(Runtime.java:823) at java.lang.System.loadLibrary(System.java:1028) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:52) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:64) at org.apache.spark.SparkContext.init(SparkContext.scala:260) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:862)
Re: spark-shell on standalone cluster gives error no mesos in java.library.path
Please ignore this error - I found the issue. Thanks ! On Mon, Jan 20, 2014 at 3:14 PM, Manoj Samel manojsamelt...@gmail.comwrote: Hi I deployed spark 0.8.1 on standalone cluster per https://spark.incubator.apache.org/docs/0.8.1/spark-standalone.html When i start a spark-shell , I get following error I thought mesos should not be required for standalone cluster. Do I have to change any parameters in make-distribution.sh that I used to build the spark distribution for this cluster ? I left all to default (and noticed that the default HADOOP version is 1.0.4 which is not my hadoop version - but I am not using Hadoop here). Creating SparkContext... Failed to load native Mesos library from java.lang.UnsatisfiedLinkError: no mesos in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1738) at java.lang.Runtime.loadLibrary0(Runtime.java:823) at java.lang.System.loadLibrary(System.java:1028) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:52) at org.apache.mesos.MesosNativeLibrary.load(MesosNativeLibrary.java:64) at org.apache.spark.SparkContext.init(SparkContext.scala:260) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:862)
RE: Spark Master on Hadoop Job Tracker?
Not sure what did you aim to solve. When you mention Spark Master, I guess you probably mean spark standalone mode? In that case spark cluster does not necessary coupled with hadoop cluster. While if you aim to achieve better data locality , then yes, run spark worker on HDFS data node might help. And for spark Master, I think that doesn't matter much. Best Regards, Raymond Liu -Original Message- From: mharwida [mailto:majdharw...@yahoo.com] Sent: Tuesday, January 21, 2014 2:14 AM To: user@spark.incubator.apache.org Subject: Spark Master on Hadoop Job Tracker? Hi, Should the Spark Master run on the Hadoop Job Tracker node (and Spark workers on Task Trackers) or the placement of the Spark Master could reside on any Hadoop node? Thanks Majd -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Master-on-Hadoop-Job-Tracker-tp680.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error: Could not find or load main class org.apache.spark.executor.CoarseGrainedExecutorBackend
Hi Hussam, Have you (1) generated Spark jar using sbt/sbt assembl, (2) distributed the Spark jar to the worker machines? It could be that the system expects that Spark jar to be present in /opt/spark-0.8.0/conf:/opt/ spark-0.8.0/assembly/target/scala-2.9.3/spark-assembly_2. 9.3-0.8.0-incubating-hadoop1.0.4.jar in one of the worker machines, but its not finding the jar and hence not finding the necessary class. Can you double-check whether the jar exists in that location in all the worker nodes? TD On Mon, Jan 20, 2014 at 4:44 PM, hussam_jar...@dell.com wrote: Hi, I am using spark 0.8.0 when hadoop 1.2.1 on Standalone cluster mode with 3 worker nodes and 1 master. Can someone help me on this error I am getting when running my app in a spark cluster ? Error: Could not find or load main class org.apache.spark.executor.CoarseGrainedExecutorBackend Command on the worker node is Spark Executor Command: java -cp :/opt/spark-0.8.0/conf:/opt/spark-0.8.0/assembly/target/scala-2.9.3/spark-assembly_2.9.3-0.8.0-incubating-hadoop1.0.4.jar -Dspark.local.dir=/home/hadoop/spark -Dspark.local.dir=/home/hadoop/spark -Dspark.local.dir=/home/hadoop/spark -Dspark.local.dir=/home/hadoop/spark -Xms49152M -Xmx49152M org.apache.spark.executor.CoarseGrainedExecutorBackend akka://spark@poc1:54483/user/CoarseGrainedScheduler 2 poc3 16 I checked logs on spark master as well spark workers but not much info except above error. Thanks, Hussam
Re: FileNotFoundException on distinct()?
Also you will need to bounce the spark services from a new ssh session to make the ulimit changes take effect (if you changed the value in /etc/limits) Sent from my mobile phone On Jan 20, 2014 5:32 PM, Jey Kottalam j...@cs.berkeley.edu wrote: Can you try ulimit -n to make sure the increased limit has taken effect? On Monday, January 20, 2014, Ryan Compton compton.r...@gmail.com wrote: I've got System.setProperty(spark.shuffle.consolidate.files, true); but I'm getting the same error. The output of the distinct count will be 101,230,940 (I did it in pig). I've got 13 nodes and each node allows 13,069,279 open files. So even with 1 record per file I think I've got enough. But what do the rest of you have for /proc/sys/fs/file-max? On Sun, Jan 19, 2014 at 5:12 PM, Mark Hamstra m...@clearstorydata.com wrote: You should try setting spark.shuffle.consolidate.files to true. On Sun, Jan 19, 2014 at 4:49 PM, Ryan Compton compton.r...@gmail.com wrote: I think I've shuffled this data before (I often join on it), and I know I was using distinct() in 0.7.3 for the same computation. What do people usually have in /proc/sys/fs/file-max? I'm real surprised that 13M isn't enough. On Sat, Jan 18, 2014 at 11:47 PM, Mark Hamstra m...@clearstorydata.com wrote: distinct() needs to do a shuffle -- which is resulting in the need to materialize the map outputs as files. count() doesn't. On Sat, Jan 18, 2014 at 10:33 PM, Ryan Compton compton.r...@gmail.com wrote: I'm able to open ~13M files. I expect the output of .distinct().count() to be under 100M, why do I need so many files open? rfcompton@node19 ~ cat /etc/redhat-release CentOS release 5.7 (Final) rfcompton@node19 ~ cat /proc/sys/fs/file-max 13069279 On Sat, Jan 18, 2014 at 9:12 AM, Jey Kottalam j...@cs.berkeley.edu wrote: The too many open files error is due to running out of available FDs, usually due to a limit set in the OS. The fix will depend on your specific OS, but under Linux it usually involves the fs.file-max syctl. On Fri, Jan 17, 2014 at 3:02 PM, Ryan Compton compton.r...@gmail.com wrote: When I try .distinct() my jobs fail. Possibly related: https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo This works //get the node ids val nodes = dupedKeyedEdgeList.map(x = x._1).cache() //count the nodes val numNodes = nodes.count() logWarning(numNodes:\t+numNodes) this fails //get the node ids val nodes = dupedKeyedEdgeList.map(x = x._1).cache() //count the nodes val numNodes = nodes.distinct().count() logWarning(numNodes:\t+numNodes) with these stacktraces: 14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges: 915189977 14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from persistence list -- 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to java.io.IOException java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299) at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77) at
RDD action hangs on a standalone mode cluster
Hi, I configured spark 0.8.1 cluster on AWS with one master node and 3 worker nodes. The cluster was configured as a standalone cluster using http://spark.incubator.apache.org/docs/latest/spark-standalone.html The distribution was generated the master node was started on master host with ./bin/start-master.sh Then on each of the worker nodes, I did a cd spark-distro directory and did ./spark-class org.apache.spark.deploy.worker.Worker spark://IP:7077 In the browser, on master 8080 port, I can see the 3 worker nodes ALIVE Next I start a spark shell on master node with MASTER=spark://IPxxx:7077 ./spark-shell. In it I create a simple RDD on a local text file with few lines and do countByKey(). The shell hangs. Doing ctrl-C gives scala credit.countByKey() java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:318) at org.apache.spark.SparkContext.runJob(SparkContext.scala:840) at org.apache.spark.SparkContext.runJob(SparkContext.scala:909) at org.apache.spark.rdd.RDD.reduce(RDD.scala:654) at org.apache.spark.rdd.RDD.countByValue(RDD.scala:752) at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:198) Note - the same works in a local shell (without master). Any pointers? Do I have to set any other network/logins? Note I am *** NOT *** starting slaves from the master node (using bin/start-slaves.sh) and thus have not set passwordless ssh login etc.
How to perform multi dimensional reduction in spark?
Hi, It seems spark does not support nested RDD's, so I was wondering how can spark handle multi dimensional reductions. As an example consider a dataset with these rows: ((i, j), value) where i, j and k are long indexes, and value is a double. How is it possible to first reduce the above rdd over j, and then reduce the results over i? Just to clarify, a scala equivalent would look like this: var results = 0 for (i - 0 until I) { var jReduction = 0 for (j - 0 until J) { *// Reduce over j* jReduction = jReduction + rdd(i,j) } *// Reduce over i* results = results * jReductions(i) }
Re: Spark Master on Hadoop Job Tracker?
If you intend to run Hadoop mapReduce and Spark on the same cluster concurrently, and you have enough memory on the jobtracker master, then you can run the Spark master (for standalone as Raymond mentions) on the same node . This is not necessary but more for convenience so you only have so ssh into one master (usually id put hive/shark server, spark master, etc on same node).— Sent from Mailbox for iPhone On Mon, Jan 20, 2014 at 8:14 PM, mharwida majdharw...@yahoo.com wrote: Hi, Should the Spark Master run on the Hadoop Job Tracker node (and Spark workers on Task Trackers) or the placement of the Spark Master could reside on any Hadoop node? Thanks Majd -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Master-on-Hadoop-Job-Tracker-tp680.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: get CPU Metrics from spark
Hi Tianshuo, Your email went to spam for me, probably for others too :) Are you referring to total CPU usage information per task? Regards Mayur Mayur Rustagi Ph: +919632149971 h https://twitter.com/mayur_rustagittp://www.sigmoidanalytics.com https://twitter.com/mayur_rustagi On Fri, Jan 17, 2014 at 6:56 AM, tdeng td...@twitter.com wrote: Hi, Dear spark users: Currently I'm doing some benchmark for spark jobs. I was able to get metrics including hdfs bytes read write by setting metrics.properties. My questions is: is there a way I can get CPU time information from the metrics? Thanks! Best Tianshuo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/get-CPU-Metrics-from-spark-tp615.html Sent from the Apache Spark User List mailing list archive at Nabble.com.