Re: Discourse: A proposed alternative to the Spark User list
That sounds good to me. Shall I open a JIRA / PR about updating the site community page? On 2015년 1월 23일 (금) at 오전 4:37 Patrick Wendell patr...@databricks.com wrote: Hey Nick, So I think we what can do is encourage people to participate on the stack overflow topic, and this I think we can do on the Spark website as a first class community resource for Spark. We should probably be spending more time on that site given its popularity. In terms of encouraging this explicitly *to replace* the ASF mailing list, that I think is harder to do. The ASF makes a lot of effort to host its own infrastructure that is neutral and not associated with any corporation. And by and large the ASF policy is to consider that as the de-facto forum of communication for any project. Personally, I wish the ASF would update this policy - for instance, by allowing the use of third party lists or communication fora - provided that they allow exporting the conversation if those sites were to change course. However, the state of the art stands as such. - Patrick On Wed, Jan 21, 2015 at 8:43 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Josh / Patrick, What do y’all think of the idea of promoting Stack Overflow as a place to ask questions over this list, as long as the questions fit SO’s guidelines (how-to-ask, dont-ask)? The apache-spark tag is very active on there. Discussions of all types are still on-topic here, but when possible we want to encourage people to use SO. Nick On Wed Jan 21 2015 at 8:37:05 AM Jay Vyas jayunit100.apa...@gmail.com wrote: Its a very valid idea indeed, but... It's a tricky subject since the entire ASF is run on mailing lists , hence there are so many different but equally sound ways of looking at this idea, which conflict with one another. On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote: I think this is a really great idea for really opening up the discussions that happen here. Also, it would be nice to know why there doesn't seem to be much interest. Maybe I'm misunderstanding some nuance of Apache projects. Cheers -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ Discourse-A-proposed-alternative-to-the-Spark-User-list-tp20851p21288.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Installing Spark Standalone to a Cluster
Do i need to manually install and configure hadoop before doing anything with spark standalone? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-Standalone-to-a-Cluster-tp21339.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions
if you're running the test via sbt you can examine the classpath that sbt uses for the test (show runtime:full-classpath or last run)-- I find this helps once too many includes and excludes interact. On Thu, Jan 22, 2015 at 3:50 PM, Adrian Mocanu amoc...@verticalscope.com wrote: I use spark 1.1.0-SNAPSHOT and the test I'm running is in local mode. My test case uses org.apache.spark.streaming.TestSuiteBase val spark=org.apache.spark %% spark-core % 1.1.0-SNAPSHOT % provided excludeAll( val sparkStreaming= org.apache.spark % spark-streaming_2.10 % 1.1.0-SNAPSHOT % provided excludeAll( val sparkCassandra= com.tuplejump % calliope_2.10 % 0.9.0-C2-EA exclude(org.apache.cassandra, cassandra-all) exclude(org.apache.cassandra, cassandra-thrift) val casAll = org.apache.cassandra % cassandra-all % 2.0.3 intransitive() val casThrift = org.apache.cassandra % cassandra-thrift % 2.0.3 intransitive() val sparkStreamingFromKafka = org.apache.spark % spark-streaming-kafka_2.10 % 0.9.1 excludeAll( -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: January-22-15 11:39 AM To: Adrian Mocanu Cc: u...@spark.incubator.apache.org Subject: Re: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions NoSuchMethodError almost always means that you have compiled some code against one version of a library but are running against another. I wonder if you are including different versions of Spark in your project, or running against a cluster on an older version? On Thu, Jan 22, 2015 at 3:57 PM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi I get this exception when I run a Spark test case on my local machine: An exception or error caused a run to abort: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/ dstream/PairDStreamFunctions; java.lang.NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/ dstream/PairDStreamFunctions; In my test case I have these Spark related imports imports: import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.TestSuiteBase import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions -Adrian B CB [ X ܚX K K[XZ[ \ \ ][ X ܚX P \ ˘\ X K ܙ B ܈ Y ] [ۘ[[X[ K[XZ[ \ \ Z [ \ ˘\ X K ܙ B B - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Discourse: A proposed alternative to the Spark User list
+1 On Fri, Jan 23, 2015 at 5:58 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: That sounds good to me. Shall I open a JIRA / PR about updating the site community page? On 2015년 1월 23일 (금) at 오전 4:37 Patrick Wendell patr...@databricks.com wrote: Hey Nick, So I think we what can do is encourage people to participate on the stack overflow topic, and this I think we can do on the Spark website as a first class community resource for Spark. We should probably be spending more time on that site given its popularity. In terms of encouraging this explicitly *to replace* the ASF mailing list, that I think is harder to do. The ASF makes a lot of effort to host its own infrastructure that is neutral and not associated with any corporation. And by and large the ASF policy is to consider that as the de-facto forum of communication for any project. Personally, I wish the ASF would update this policy - for instance, by allowing the use of third party lists or communication fora - provided that they allow exporting the conversation if those sites were to change course. However, the state of the art stands as such. - Patrick On Wed, Jan 21, 2015 at 8:43 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Josh / Patrick, What do y’all think of the idea of promoting Stack Overflow as a place to ask questions over this list, as long as the questions fit SO’s guidelines (how-to-ask, dont-ask)? The apache-spark tag is very active on there. Discussions of all types are still on-topic here, but when possible we want to encourage people to use SO. Nick On Wed Jan 21 2015 at 8:37:05 AM Jay Vyas jayunit100.apa...@gmail.com wrote: Its a very valid idea indeed, but... It's a tricky subject since the entire ASF is run on mailing lists , hence there are so many different but equally sound ways of looking at this idea, which conflict with one another. On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote: I think this is a really great idea for really opening up the discussions that happen here. Also, it would be nice to know why there doesn't seem to be much interest. Maybe I'm misunderstanding some nuance of Apache projects. Cheers -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ Discourse-A-proposed-alternative-to-the-Spark-User- list-tp20851p21288.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Installing Spark Standalone to a Cluster
I need someone to send me a snapshot of his /conf/spark-env.sh file cause i don't understand how to set some vars like SPARK_MASTER etc -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-Standalone-to-a-Cluster-tp21341.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Large number of pyspark.daemon processes
Hey all, I am running into a problem where YARN kills containers for being over their memory allocation (which is about 8G for executors plus 6G for overhead), and I noticed that in those containers there are tons of pyspark.daemon processes hogging memory. Here's a snippet from a container with 97 pyspark.daemon processes. The total sum of RSS usage across all of these is 1,764,956 pages (i.e. 6.7GB on the system). Any ideas what's happening here and how I can get the number of pyspark.daemon processes back to a more reasonable count? 2015-01-23 15:36:53,654 INFO [Reporter] yarn.YarnAllocationHandler (Logging.scala:logInfo(59)) - Container marked as failed: container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: Container [pid=35211,containerID=container_1421692415636_0052_01_30] is running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing container. Dump of the process-tree for container_1421692415636_0052_01_30 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m pyspark.daemon |- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m pyspark.daemon |- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m pyspark.daemon [...] Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c Thank you! -Sven -- http://sites.google.com/site/krasser/?utm_source=sig
Re: Results never return to driver | Spark Custom Reader
It looks to me like your executor actually crashed and didn't just finish properly. Can you check the executor log? It is available in the UI, or on the worker machine, under $SPARK_HOME/work/ app-20150123155114-/6/stderr (unless you manually changed the work directory location but in that case I'd assume you know where to find the log) On Thu, Jan 22, 2015 at 10:54 PM, Harihar Nahak hna...@wynyardgroup.com wrote: Hi All, I wrote a custom reader to read a DB, and it is able to return key and value as expected but after it finished it never returned to driver here is output of worker log : 15/01/23 15:51:38 INFO worker.ExecutorRunner: Launch command: java -cp ::/usr/local/spark-1.2.0-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/spark-assembly-1.2.0-hadoop2.4.0.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/hadoop/etc/hadoop -XX:MaxPermSize=128m -Dspark.driver.port=53484 -Xms1024M -Xmx1024M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@VM90:53484/user/CoarseGrainedScheduler 6 VM99 4 app-20150123155114- akka.tcp://sparkWorker@VM99:44826/user/Worker 15/01/23 15:51:47 INFO worker.Worker: Executor app-20150123155114-/6 finished with state EXITED message Command exited with code 1 exitStatus 1 15/01/23 15:51:47 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@VM99:57695] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/01/23 15:51:47 INFO actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40143.96.25.29%3A35065-4#-915179653] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 15/01/23 15:51:49 INFO worker.Worker: Asked to kill unknown executor app-20150123155114-/6 If someone noticed any clue to fixed that will really appreciate. - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Results-never-return-to-driver-Spark-Custom-Reader-tp21328.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Installing Spark Standalone to a Cluster
not needed. You can directly follow the installation However you might need sbt to package your files to jar. On Fri, Jan 23, 2015 at 11:54 AM, riginos samarasrigi...@gmail.com wrote: Do i need to manually install and configure hadoop before doing anything with spark standalone? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-Standalone-to-a-Cluster-tp21339.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Haripriya Ayyalasomayajula Graduate Student Department of Computer Science University of Houston Contact : 650-796-7112
Re: How to make spark partition sticky, i.e. stay with node?
Hello mingyu, That is a reasonable way of doing this. Spark Streaming natively does not support sticky because Spark launches tasks based on data locality. If there is no locality (example reduce tasks can run anywhere), location is randomly assigned. So the cogroup or join introduces a locality and which forces Spark scheduler to be sticky. Another way to achieve this is using updateStateByKey which internally uses cogroup, but presents a nicer streaming-like API for per-key stateful operations. TD On Fri, Jan 23, 2015 at 8:23 AM, mingyu mingyut...@gmail.com wrote: I found a workaround. I can make my auxiliary data a RDD. Partition it and cache it. Later, I can cogroup it with other RDDs and Spark will try to keep the cached RDD partitions where they are and not shuffle them. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-spark-partition-sticky-i-e-stay-with-node-tp21322p21338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark 1.1.0 save data to hdfs failed
Thanks. I looked at the dependency tree. I did not see any dependent jar of hadoop-core from hadoop2. However the jar built from maven has the class: org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.class Do you know why? Date: Fri, 23 Jan 2015 17:01:48 + Subject: RE: spark 1.1.0 save data to hdfs failed From: so...@cloudera.com To: eyc...@hotmail.com Are you receiving my replies? I have suggested a resolution. Look at the dependency tree next. On Jan 23, 2015 2:43 PM, ey-chih chow eyc...@hotmail.com wrote: I looked into the source code of SparkHadoopMapReduceUtil.scala. I think it is broken in the following code: def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {val klass = firstAvailableClass( org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl, // hadoop2, hadoop2-yarnorg.apache.hadoop.mapreduce.TaskAttemptContext) // hadoop1val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] } In other words, it is related to hadoop2, hadoop2-yarn, and hadoop1. Any suggestion how to resolve it? Thanks. From: so...@cloudera.com Date: Fri, 23 Jan 2015 14:01:45 + Subject: Re: spark 1.1.0 save data to hdfs failed To: eyc...@hotmail.com CC: user@spark.apache.org These are all definitely symptoms of mixing incompatible versions of libraries. I'm not suggesting you haven't excluded Spark / Hadoop, but, this is not the only way Hadoop deps get into your app. See my suggestion about investigating the dependency tree. On Fri, Jan 23, 2015 at 1:53 PM, ey-chih chow eyc...@hotmail.com wrote: Thanks. But I think I already mark all the Spark and Hadoop reps as provided. Why the cluster's version is not used? Any way, as I mentioned in the previous message, after changing the hadoop-client to version 1.2.1 in my maven deps, I already pass the exception and go to another one as indicated below. Any suggestion on this? = Exception in thread main java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35) at org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832) at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103) at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala) ... 6 more
spark 1.2 - Writing parque fails for timestamp with Unsupported datatype TimestampType
Using Spark 1.2 Read a CSV file, apply schema to convert to SchemaRDD and then schemaRdd.saveAsParquetFile If the schema includes Timestamptype, it gives following trace when doing the save Exception in thread main java.lang.RuntimeException: Unsupported datatype TimestampType at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply( ParquetTypes.scala:343) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply( ParquetTypes.scala:292) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType( ParquetTypes.scala:291) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply( ParquetTypes.scala:363) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply( ParquetTypes.scala:362) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply( TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes( ParquetTypes.scala:361) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData( ParquetTypes.scala:407) at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty( ParquetRelation.scala:166) at org.apache.spark.sql.parquet.ParquetRelation$.create( ParquetRelation.scala:145) at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply( SparkStrategies.scala:204) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply( QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply( QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply( QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute( SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan( SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute( SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan( SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute( SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425 ) at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile( SchemaRDDLike.scala:76) at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:108) at bdrt.MyTest$.createParquetWithDate(MyTest.scala:88) at bdrt.MyTest$delayedInit$body.apply(MyTest.scala:54) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach( TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at bdrt.MyTest$.main(MyTest.scala:10)
Starting a spark streaming app in init.d
Hello, I'm trying to kick off a spark streaming job to a stand alone master using spark submit inside of init.d. This is what I have: DAEMON=spark-submit --class Streamer --executor-memory 500M --total-executor-cores 4 /path/to/assembly.jar start() { $DAEMON -p /var/run/my_assembly.pid echo OK return 0 } However, will return 0 even if spark_submit fails. Is there a way to run spark-submit in the background and return 0 only if it successfully starts up? Or better yet, is there something in spark-submit that will allow me to do this, perhaps via a command line argument? Thanks, Ashic.
Spark Streaming action not triggered with Kafka inputs
I am running into some problems with Spark Streaming when reading from Kafka.I used Spark 1.2.0 built on CDH5. The example is based on: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala * It works with default implementation. val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) * However, when I changed it to parallel receiving, like shown below val topicMap = topics.split(,).map((_, 1)).toMap val parallelInputs = (1 to numThreads.toInt) map { _ = KafkaUtils. createStream(ssc, zkQuorum, group, topicMap) } ssc.union(parallelInputs) After the change, the job stage just hang there and never finish. It looks like no action is triggered on the streaming job. When I check the Streaming tab, it show messages below: Batch Processing Statistics No statistics have been generated yet. Am I doing anything wrong on the parallel receiving part? -- Chen Song
Problems saving a large RDD (1 TB) to S3 as a sequence file
I've tried various ideas, but I'm really just shooting in the dark. I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024 partitions) I'm trying to save off to S3 is approximately 1TB in size (with the partitions pretty evenly distributed in size). I just tried a test to dial back the number of executors on my cluster from using the entire cluster (256 cores) down to 128. Things seemed to get a bit farther (maybe) before the wheels started spinning off again. But, the job always fails when all I'm trying to do is save the 1TB file to S3. I see the following in my master log file. 15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316 because we got no heartbeat in 60 seconds 15/01/23 19:01:54 INFO master.Master: Removing worker worker-20150123172316 on 15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3 For the stage that eventually fails, I see the following summary information. Summary Metrics for 729 Completed Tasks Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min GC Time 0 ms 0.3 s 0.4 s 0.5 s 5 s Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB So, the max GC was only 5s for 729 completed tasks. This sounds reasonable. As people tend to indicate GC is the reason one loses executors, this does not appear to be my case. Here is a typical snapshot for some completed tasks. So, you can see that they tend to complete in approximately 6 minutes. So, it takes about 6 minutes to write one partition to S3 (a partition being roughly 1 GB) 65 23619 0 SUCCESS ANY 5 / 2015/01/23 18:30:32 5.8 min 0.9 s 344.6 MB 59 23613 0 SUCCESS ANY 7 / 2015/01/23 18:30:32 6.0 min 0.4 s 324.1 MB 68 23622 0 SUCCESS ANY 1 / 2015/01/23 18:30:32 5.7 min 0.5 s 329.9 MB 62 23616 0 SUCCESS ANY 6 / 2015/01/23 18:30:32 5.8 min 0.7 s 326.4 MB 61 23615 0 SUCCESS ANY 3 / 2015/01/23 18:30:32 5.5 min 1 s 335.7 MB 64 23618 0 SUCCESS ANY 2 / 2015/01/23 18:30:32 5.6 min 2 s 328.1 MB Then towards the end, when things start heading south, I see the following. These tasks never complete but you can see that they have taken more than 47 minutes (so far) before the job finally fails. Not really sure why. 671 24225 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 672 24226 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 673 24227 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 674 24228 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 675 24229 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 676 24230 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 677 24231 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 678 24232 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 679 24233 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 680 24234 0 RUNNING ANY 1 / 2015/01/23 18:59:17 47 min 681 24235 0 RUNNING ANY 1 / 2015/01/23 18:59:18 47 min 682 24236 0 RUNNING ANY 1 / 2015/01/23 18:59:18 47 min 683 24237 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 684 24238 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 685 24239 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 686 24240 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 687 24241 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 688 24242 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 689 24243 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 690 24244 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 691 24245 0 RUNNING ANY 5 / 2015/01/23 18:59:21 47 min What's odd is that even on the same machine (see below) some tasks are still completing (in less than 5 minutes) while other tasks on the same machine seem to be hung after 46 minutes. Keep in mind all I'm doing is saving the file to S3 so one would think the amount of work per task/partition would be fairly equal. 694 24248 0 SUCCESS ANY 0 / 2015/01/23 18:59:32 4.5 min 0.3 s 326.5 MB 695 24249 0 SUCCESS ANY 0 / 2015/01/23 18:59:32 4.5 min 0.3 s 330.8 MB 696 24250 0 RUNNING ANY 0 / 2015/01/23 18:59:32 46 min 697 24251
Re: Discourse: A proposed alternative to the Spark User list
https://issues.apache.org/jira/browse/SPARK-5390 On Fri Jan 23 2015 at 12:05:00 PM Gerard Maas gerard.m...@gmail.com wrote: +1 On Fri, Jan 23, 2015 at 5:58 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: That sounds good to me. Shall I open a JIRA / PR about updating the site community page? On 2015년 1월 23일 (금) at 오전 4:37 Patrick Wendell patr...@databricks.com wrote: Hey Nick, So I think we what can do is encourage people to participate on the stack overflow topic, and this I think we can do on the Spark website as a first class community resource for Spark. We should probably be spending more time on that site given its popularity. In terms of encouraging this explicitly *to replace* the ASF mailing list, that I think is harder to do. The ASF makes a lot of effort to host its own infrastructure that is neutral and not associated with any corporation. And by and large the ASF policy is to consider that as the de-facto forum of communication for any project. Personally, I wish the ASF would update this policy - for instance, by allowing the use of third party lists or communication fora - provided that they allow exporting the conversation if those sites were to change course. However, the state of the art stands as such. - Patrick On Wed, Jan 21, 2015 at 8:43 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Josh / Patrick, What do y’all think of the idea of promoting Stack Overflow as a place to ask questions over this list, as long as the questions fit SO’s guidelines (how-to-ask, dont-ask)? The apache-spark tag is very active on there. Discussions of all types are still on-topic here, but when possible we want to encourage people to use SO. Nick On Wed Jan 21 2015 at 8:37:05 AM Jay Vyas jayunit100.apa...@gmail.com wrote: Its a very valid idea indeed, but... It's a tricky subject since the entire ASF is run on mailing lists , hence there are so many different but equally sound ways of looking at this idea, which conflict with one another. On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote: I think this is a really great idea for really opening up the discussions that happen here. Also, it would be nice to know why there doesn't seem to be much interest. Maybe I'm misunderstanding some nuance of Apache projects. Cheers -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ Discourse-A-proposed-alternative-to-the-Spark-User- list-tp20851p21288.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problems saving a large RDD (1 TB) to S3 as a sequence file
Hey Darin, Are you running this over EMR or as a standalone cluster? I've had occasional success in similar cases by digging through all executor logs and trying to find exceptions that are not caused by the application shutdown (but the logs remain my main pain point with Spark). That aside, another explanation could be S3 throttling you due to volume (and hence causing write requests to fail). You can try to split your file into multiple pieces and store those as S3 objects with different prefixes to make sure they end up in different partitions in S3. See here for details: http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html. If that works, that'll narrow the cause down. Best, -Sven On Fri, Jan 23, 2015 at 12:04 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: I've tried various ideas, but I'm really just shooting in the dark. I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024 partitions) I'm trying to save off to S3 is approximately 1TB in size (with the partitions pretty evenly distributed in size). I just tried a test to dial back the number of executors on my cluster from using the entire cluster (256 cores) down to 128. Things seemed to get a bit farther (maybe) before the wheels started spinning off again. But, the job always fails when all I'm trying to do is save the 1TB file to S3. I see the following in my master log file. 15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316 because we got no heartbeat in 60 seconds 15/01/23 19:01:54 INFO master.Master: Removing worker worker-20150123172316 on 15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3 For the stage that eventually fails, I see the following summary information. Summary Metrics for 729 Completed Tasks Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min GC Time 0 ms 0.3 s 0.4 s 0.5 s 5 s Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB So, the max GC was only 5s for 729 completed tasks. This sounds reasonable. As people tend to indicate GC is the reason one loses executors, this does not appear to be my case. Here is a typical snapshot for some completed tasks. So, you can see that they tend to complete in approximately 6 minutes. So, it takes about 6 minutes to write one partition to S3 (a partition being roughly 1 GB) 65 23619 0 SUCCESS ANY 5 / 2015/01/23 18:30:32 5.8 min 0.9 s 344.6 MB 59 23613 0 SUCCESS ANY 7 / 2015/01/23 18:30:32 6.0 min 0.4 s 324.1 MB 68 23622 0 SUCCESS ANY 1 / 2015/01/23 18:30:32 5.7 min 0.5 s 329.9 MB 62 23616 0 SUCCESS ANY 6 / 2015/01/23 18:30:32 5.8 min 0.7 s 326.4 MB 61 23615 0 SUCCESS ANY 3 / 2015/01/23 18:30:32 5.5 min 1 s 335.7 MB 64 23618 0 SUCCESS ANY 2 / 2015/01/23 18:30:32 5.6 min 2 s 328.1 MB Then towards the end, when things start heading south, I see the following. These tasks never complete but you can see that they have taken more than 47 minutes (so far) before the job finally fails. Not really sure why. 671 24225 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 672 24226 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 673 24227 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 674 24228 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 675 24229 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 676 24230 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 677 24231 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 678 24232 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 679 24233 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 680 24234 0 RUNNING ANY 1 / 2015/01/23 18:59:17 47 min 681 24235 0 RUNNING ANY 1 / 2015/01/23 18:59:18 47 min 682 24236 0 RUNNING ANY 1 / 2015/01/23 18:59:18 47 min 683 24237 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 684 24238 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 685 24239 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 686 24240 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 687 24241 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 688 24242 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 689 24243 0 RUNNING ANY 5 / 2015/01/23 18:59:20 47 min 690 24244 0
Apache Spark standalone mode: number of cores
I'm trying to understand the basics of Spark internals and Spark documentation for submitting applications in local mode says for spark-submit --master setting: local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). local[*] Run Spark locally with as many worker threads as logical cores on your machine. Since all the data is stored on a single local machine, it does not benefit from distributed operations on RDDs. How does it benefit and what internally is going on when Spark utilizes several logical cores? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-standalone-mode-number-of-cores-tp21342.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-shell has syntax error on windows.
Do you mind filing a JIRA issue for this which includes the actual error message string that you saw? https://issues.apache.org/jira/browse/SPARK On Thu, Jan 22, 2015 at 8:31 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: I am not sure if you get the same exception as I do -- spark-shell2.cmd works fine for me. Windows 7 as well. I've never bothered looking to fix it as it seems spark-shell just calls spark-shell2 anyway... On Thu, Jan 22, 2015 at 3:16 AM, Vladimir Protsenko protsenk...@gmail.com wrote: I have a problem with running spark shell in windows 7. I made the following steps: 1. downloaded and installed Scala 2.11.5 2. downloaded spark 1.2.0 by git clone git://github.com/apache/spark.git 3. run dev/change-version-to-2.11.sh and mvn -Dscala-2.11 -DskipTests clean package (in git bash) After installation tried to run spark-shell.cmd in cmd shell and it says there is a syntax error in file. What could I do to fix problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-has-syntax-error-on-windows-tp21313.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark standalone mode: number of cores
The local mode still parallelizes calculations and it is useful for debugging as it goes through the steps of serialization/deserialization as a cluster would. On Fri, Jan 23, 2015 at 5:44 PM, olegshirokikh o...@solver.com wrote: I'm trying to understand the basics of Spark internals and Spark documentation for submitting applications in local mode says for spark-submit --master setting: local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). local[*] Run Spark locally with as many worker threads as logical cores on your machine. Since all the data is stored on a single local machine, it does not benefit from distributed operations on RDDs. How does it benefit and what internally is going on when Spark utilizes several logical cores? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-standalone-mode-number-of-cores-tp21342.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to 'Pipe' Binary Data in Apache Spark
Spark Committers: Please advise the way forward for this issue. Thanks for your support. Regards, Venkat From: Venkat, Ankam Sent: Thursday, January 22, 2015 9:34 AM To: 'Frank Austin Nothaft'; 'user@spark.apache.org' Cc: 'Nick Allen' Subject: RE: How to 'Pipe' Binary Data in Apache Spark How much time it takes to port it? Spark committers: Please let us know your thoughts. Regards, Venkat From: Frank Austin Nothaft [mailto:fnoth...@berkeley.edu] Sent: Thursday, January 22, 2015 9:08 AM To: Venkat, Ankam Cc: Nick Allen; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to 'Pipe' Binary Data in Apache Spark Venkat, No problem! So, creating a custom InputFormat or using sc.binaryFiles alone is not the right solution. We also need the modified version of RDD.pipe to support binary data? Is my understanding correct? Yep! That is correct. The custom InputFormat allows Spark to load binary formatted data from disk/HDFS/S3/etc..., but then the default RDD.pipe reads/writes text to a pipe, so you'd need the custom mapPartitions call. If yes, this can be added as new enhancement Jira request? The code that I have right now is fairly custom to my application, but if there was interest, I would be glad to port it for the Spark core. Regards, Frank Austin Nothaft fnoth...@berkeley.edumailto:fnoth...@berkeley.edu fnoth...@eecs.berkeley.edumailto:fnoth...@eecs.berkeley.edu 202-340-0466 On Jan 22, 2015, at 7:11 AM, Venkat, Ankam ankam.ven...@centurylink.commailto:ankam.ven...@centurylink.com wrote: Thanks Frank for your response. So, creating a custom InputFormat or using sc.binaryFiles alone is not the right solution. We also need the modified version of RDD.pipe to support binary data? Is my understanding correct? If yes, this can be added as new enhancement Jira request? Nick: What's your take on this? Regards, Venkat Ankam From: Frank Austin Nothaft [mailto:fnoth...@berkeley.edu] Sent: Wednesday, January 21, 2015 12:30 PM To: Venkat, Ankam Cc: Nick Allen; user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to 'Pipe' Binary Data in Apache Spark Hi Venkat/Nick, The Spark RDD.pipe method pipes text data into a subprocess and then receives text data back from that process. Once you have the binary data loaded into an RDD properly, to pipe binary data to/from a subprocess (e.g., you want the data in the pipes to contain binary, not text), you need to implement your own, modified version of RDD.pipe. The implementationhttps://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala of RDD.pipe spawns a process per partition (IIRC), as well as threads for writing to and reading from the process (as well as stderr for the process). When writing via RDD.pipe, Spark calls *.toString on the object, and pushes that text representation down the pipe. There is an example of how to pipe binary data from within a mapPartitions call using the Scala API in lines 107-177 of this filehttps://github.com/bigdatagenomics/avocado/blob/master/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/ExternalGenotyper.scala. This specific code contains some nastiness around the packaging of downstream libraries that we rely on in that project, so I'm not sure if it is the cleanest way, but it is a workable way. Regards, Frank Austin Nothaft fnoth...@berkeley.edumailto:fnoth...@berkeley.edu fnoth...@eecs.berkeley.edumailto:fnoth...@eecs.berkeley.edu 202-340-0466 On Jan 21, 2015, at 9:17 AM, Venkat, Ankam ankam.ven...@centurylink.commailto:ankam.ven...@centurylink.com wrote: I am trying to solve similar problem. I am using option # 2 as suggested by Nick. I have created an RDD with sc.binaryFiles for a list of .wav files. But, I am not able to pipe it to the external programs. For example: sq = sc.binaryFiles(wavfiles) -- All .wav files stored on wavfiles directory on HDFS sq.keys().collect() -- works fine. Shows the list of file names. sq.values().collect() -- works fine. Shows the content of the files. sq.values().pipe(lambda x: subprocess.call(['/usr/local/bin/sox', '-t' 'wav', '-', '-n', 'stats'])).collect() -- Does not work. Tried different options. AttributeError: 'function' object has no attribute 'read' Any suggestions? Regards, Venkat Ankam From: Nick Allen [mailto:n...@nickallen.org] Sent: Friday, January 16, 2015 11:46 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to 'Pipe' Binary Data in Apache Spark I just wanted to reiterate the solution for the benefit of the community. The problem is not from my use of 'pipe', but that 'textFile' cannot be used to read in binary data. (Doh) There are a couple options to move forward. 1. Implement a custom 'InputFormat' that understands the binary input data. (Per Sean Owen) 2. Use 'SparkContext.binaryFiles' to read in the entire binary file as a single record. This will impact performance as it
Re: spark-shell has syntax error on windows.
https://issues.apache.org/jira/browse/SPARK-5389 I marked as minor since I also just discovered that I can run it under PowerShell just fine. Vladimir, feel free to change the bug if you're getting a different message or a more serious issue. On Fri, Jan 23, 2015 at 4:44 PM, Josh Rosen rosenvi...@gmail.com wrote: Do you mind filing a JIRA issue for this which includes the actual error message string that you saw? https://issues.apache.org/jira/browse/SPARK On Thu, Jan 22, 2015 at 8:31 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: I am not sure if you get the same exception as I do -- spark-shell2.cmd works fine for me. Windows 7 as well. I've never bothered looking to fix it as it seems spark-shell just calls spark-shell2 anyway... On Thu, Jan 22, 2015 at 3:16 AM, Vladimir Protsenko protsenk...@gmail.com wrote: I have a problem with running spark shell in windows 7. I made the following steps: 1. downloaded and installed Scala 2.11.5 2. downloaded spark 1.2.0 by git clone git://github.com/apache/spark.git 3. run dev/change-version-to-2.11.sh and mvn -Dscala-2.11 -DskipTests clean package (in git bash) After installation tried to run spark-shell.cmd in cmd shell and it says there is a syntax error in file. What could I do to fix problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-has-syntax-error-on-windows-tp21313.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problems saving a large RDD (1 TB) to S3 as a sequence file
Thanks for the ideas Sven. I'm using stand-alone cluster (Spark 1.2). FWIW, I was able to get this running (just now). This is the first time it's worked in probably my last 10 attempts. In addition to limiting the executors to only 50% of the cluster. In the settings below, I additionally added/changed the following. Maybe, I just got lucky (although I think not). Would be good if someone could weigh in and agree that these changes are sensible. I'm also hoping the support for placement groups (targeted for 1.3 in the ec2 scripts) will help the situation. All in all, it takes about 45 minutes to write a 1 TB file back to S3 (as 1024 partitions). SparkConf conf = new SparkConf() .setAppName(SparkSync Application) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.rdd.compress,true) .set(spark.core.connection.ack.wait.timeout,600) .set(spark.akka.timeout,600)// Increased from 300 .set(spark.akka.threads,16) // Added so that default was increased from 4 to 16 .set(spark.task.maxFailures,64) // Didn't really matter as I had no failures in this run .set(spark.storage.blockManagerSlaveTimeoutMs,30); From: Sven Krasser kras...@gmail.com To: Darin McBeath ddmcbe...@yahoo.com Cc: User user@spark.apache.org Sent: Friday, January 23, 2015 5:12 PM Subject: Re: Problems saving a large RDD (1 TB) to S3 as a sequence file Hey Darin, Are you running this over EMR or as a standalone cluster? I've had occasional success in similar cases by digging through all executor logs and trying to find exceptions that are not caused by the application shutdown (but the logs remain my main pain point with Spark). That aside, another explanation could be S3 throttling you due to volume (and hence causing write requests to fail). You can try to split your file into multiple pieces and store those as S3 objects with different prefixes to make sure they end up in different partitions in S3. See here for details: http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html. If that works, that'll narrow the cause down. Best, -Sven On Fri, Jan 23, 2015 at 12:04 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: I've tried various ideas, but I'm really just shooting in the dark. I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024 partitions) I'm trying to save off to S3 is approximately 1TB in size (with the partitions pretty evenly distributed in size). I just tried a test to dial back the number of executors on my cluster from using the entire cluster (256 cores) down to 128. Things seemed to get a bit farther (maybe) before the wheels started spinning off again. But, the job always fails when all I'm trying to do is save the 1TB file to S3. I see the following in my master log file. 15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316 because we got no heartbeat in 60 seconds 15/01/23 19:01:54 INFO master.Master: Removing worker worker-20150123172316 on 15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3 For the stage that eventually fails, I see the following summary information. Summary Metrics for 729 Completed Tasks Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min GC Time 0 ms 0.3 s 0.4 s 0.5 s 5 s Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB So, the max GC was only 5s for 729 completed tasks. This sounds reasonable. As people tend to indicate GC is the reason one loses executors, this does not appear to be my case. Here is a typical snapshot for some completed tasks. So, you can see that they tend to complete in approximately 6 minutes. So, it takes about 6 minutes to write one partition to S3 (a partition being roughly 1 GB) 65 23619 0 SUCCESS ANY 5 / 2015/01/23 18:30:32 5.8 min 0.9 s 344.6 MB 59 23613 0 SUCCESS ANY 7 / 2015/01/23 18:30:32 6.0 min 0.4 s 324.1 MB 68 23622 0 SUCCESS ANY 1 / 2015/01/23 18:30:32 5.7 min 0.5 s 329.9 MB 62 23616 0 SUCCESS ANY 6 / 2015/01/23 18:30:32 5.8 min 0.7 s 326.4 MB 61 23615 0 SUCCESS ANY 3 / 2015/01/23 18:30:32 5.5 min 1 s 335.7 MB 64 23618 0 SUCCESS ANY 2 / 2015/01/23 18:30:32 5.6 min 2 s 328.1 MB Then towards the end, when things start heading south, I see the following. These tasks never complete but you can see that they have taken more than 47 minutes (so far) before the job finally fails. Not really sure why. 671 24225 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 672 24226 0 RUNNING ANY 1 / 2015/01/23 18:59:14 47 min 673 24227 0 RUNNING ANY 1 / 2015/01/23
RE: spark 1.1.0 save data to hdfs failed
Thanks. But I think I already mark all the Spark and Hadoop reps as provided. Why the cluster's version is not used? Any way, as I mentioned in the previous message, after changing the hadoop-client to version 1.2.1 in my maven deps, I already pass the exception and go to another one as indicated below. Any suggestion on this? =Exception in thread main java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35) at org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832) at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103) at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala)... 6 more From: so...@cloudera.com Date: Fri, 23 Jan 2015 10:41:12 + Subject: Re: spark 1.1.0 save data to hdfs failed To: eyc...@hotmail.com CC: user@spark.apache.org So, you should not depend on Hadoop artifacts unless you use them directly. You should mark Hadoop and Spark deps as provided. Then the cluster's version is used at runtime with spark-submit. That's the usual way to do it, which works. If you need to embed Spark in your app and are running it outside the cluster for some reason, and you have to embed Hadoop and Spark code in your app, the version has to match. You should also use mvn dependency:tree to see all the dependencies coming in. There may be many sources of a Hadoop dep. On Fri, Jan 23, 2015 at 1:05 AM, ey-chih chow eyc...@hotmail.com wrote: Thanks. But after I replace the maven dependence from dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version2.5.0-cdh5.2.0/version scopeprovided/scope exclusions exclusion groupIdorg.mortbay.jetty/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdjavax.servlet/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdio.netty/groupId artifactIdnetty/artifactId /exclusion /exclusions /dependency to dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version1.0.4/version scopeprovided/scope exclusions exclusion groupIdorg.mortbay.jetty/groupId artifactIdservlet-api/artifactId /exclusion
Re: spark 1.1.0 save data to hdfs failed
These are all definitely symptoms of mixing incompatible versions of libraries. I'm not suggesting you haven't excluded Spark / Hadoop, but, this is not the only way Hadoop deps get into your app. See my suggestion about investigating the dependency tree. On Fri, Jan 23, 2015 at 1:53 PM, ey-chih chow eyc...@hotmail.com wrote: Thanks. But I think I already mark all the Spark and Hadoop reps as provided. Why the cluster's version is not used? Any way, as I mentioned in the previous message, after changing the hadoop-client to version 1.2.1 in my maven deps, I already pass the exception and go to another one as indicated below. Any suggestion on this? = Exception in thread main java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35) at org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832) at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103) at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala) ... 6 more - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark 1.1.0 save data to hdfs failed
Sorry I still did not quiet get your resolution. In my jar, there are following three related classes: org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.classorg/apache/hadoop/mapreduce/task/TaskAttemptContextImpl$DummyReporter.classorg/apache/hadoop/mapreduce/TaskAttemptContext.class I think the first two come from hadoop2 and the third from hadoop1. I would like to get rid of the first two. I checked my source code. It does have a place using the class (or interface in hadoop2) TaskAttemptContext.Do you mean I make a separate jar for this portion of code and built with hadoop1 to get rid of dependency? An alternative way is to modify the code in SparkHadoopMapReduceUtil.scala and put it into my own source code to bypass the problem. Any comment on this? Thanks. From: eyc...@hotmail.com To: so...@cloudera.com CC: user@spark.apache.org Subject: RE: spark 1.1.0 save data to hdfs failed Date: Fri, 23 Jan 2015 11:17:36 -0800 Thanks. I looked at the dependency tree. I did not see any dependent jar of hadoop-core from hadoop2. However the jar built from maven has the class: org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.class Do you know why? Date: Fri, 23 Jan 2015 17:01:48 + Subject: RE: spark 1.1.0 save data to hdfs failed From: so...@cloudera.com To: eyc...@hotmail.com Are you receiving my replies? I have suggested a resolution. Look at the dependency tree next. On Jan 23, 2015 2:43 PM, ey-chih chow eyc...@hotmail.com wrote: I looked into the source code of SparkHadoopMapReduceUtil.scala. I think it is broken in the following code: def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {val klass = firstAvailableClass( org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl, // hadoop2, hadoop2-yarnorg.apache.hadoop.mapreduce.TaskAttemptContext) // hadoop1val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] } In other words, it is related to hadoop2, hadoop2-yarn, and hadoop1. Any suggestion how to resolve it? Thanks. From: so...@cloudera.com Date: Fri, 23 Jan 2015 14:01:45 + Subject: Re: spark 1.1.0 save data to hdfs failed To: eyc...@hotmail.com CC: user@spark.apache.org These are all definitely symptoms of mixing incompatible versions of libraries. I'm not suggesting you haven't excluded Spark / Hadoop, but, this is not the only way Hadoop deps get into your app. See my suggestion about investigating the dependency tree. On Fri, Jan 23, 2015 at 1:53 PM, ey-chih chow eyc...@hotmail.com wrote: Thanks. But I think I already mark all the Spark and Hadoop reps as provided. Why the cluster's version is not used? Any way, as I mentioned in the previous message, after changing the hadoop-client to version 1.2.1 in my maven deps, I already pass the exception and go to another one as indicated below. Any suggestion on this? = Exception in thread main java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35) at org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932) at
Re: Large number of pyspark.daemon processes
Hi Sven, What version of Spark are you running? Recent versions have a change that allows PySpark to share a pool of processes instead of starting a new one for each task. -Sandy On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote: Hey all, I am running into a problem where YARN kills containers for being over their memory allocation (which is about 8G for executors plus 6G for overhead), and I noticed that in those containers there are tons of pyspark.daemon processes hogging memory. Here's a snippet from a container with 97 pyspark.daemon processes. The total sum of RSS usage across all of these is 1,764,956 pages (i.e. 6.7GB on the system). Any ideas what's happening here and how I can get the number of pyspark.daemon processes back to a more reasonable count? 2015-01-23 15:36:53,654 INFO [Reporter] yarn.YarnAllocationHandler (Logging.scala:logInfo(59)) - Container marked as failed: container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: Container [pid=35211,containerID=container_1421692415636_0052_01_30] is running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing container. Dump of the process-tree for container_1421692415636_0052_01_30 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m pyspark.daemon |- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m pyspark.daemon |- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m pyspark.daemon [...] Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c Thank you! -Sven -- http://sites.google.com/site/krasser/?utm_source=sig
Re: How to replay consuming messages from kafka using spark streaming?
Hi, I have written spark streaming kafka receiver using kafka simple consumer api: https://github.com/mykidong/spark-kafka-simple-consumer-receiver This kafka receiver can be used as alternative to the current spark streaming kafka receiver which is just written in high level kafka consumer api. With this kafka receiver, the kafka message offset control can be done more easier for Receiver Woker Node Failure and Driver Node Failure. - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145p21343.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming: getting data from Cassandra based on input stream values
Hi, In that case, you can try the following. val joinRDD = kafkaStream.transform( streamRDD = { val ids = streamRDD.map(_._2).collect(); ids.map(userId = ctable.select(user_name).where(userid = ?, userId).toArray(0).get[String](0)) // better create a query which checks for all those ids at same time }) On Sat, Jan 24, 2015 at 3:32 AM, Greg Temchenko s...@dicefield.com wrote: Hi Madhu, Thanks for you response! But as I understand in this case you select all data from the Cassandra table. I don't wanna do it as it can be huge. I wanna just lookup some ids in the table. So it doesn't make sense for me how I can put some values from the streamRDD to the cassandra query (to where method). Greg On 1/23/15 1:11 AM, madhu phatak wrote: Hi, Seems like you want to get username for a give user id. You can use transform on the kafka stream to join two RDD's. The psuedo code looks like this val joinRDD = kafkaStream.transform( streamRDD = { streamRDD.map(value = (value._2,value._1)) join with (ctable.select(userid,username)) }) On Fri, Jan 23, 2015 at 10:12 AM, Greg Temchenko s...@dicefield.com wrote: Hi there, I think I have a basic question, but I'm sort of stuck with figuring out how to approach it, and I thought someone could point me to the right direction. I'd like pull some data from Cassandra based on values received from an input stream. Something like val ctable = ssc.cassandraTable(keyspace, users) val userNames = kafkaStream.flatMap { case (key,userid) = { val userName = ctable.select(user_name).where(userid = ?, userId).toArray(0).get[String](0) Some(userId, userName) } } While the Cassandra query works in Spark shell, it throws an exception when I used it inside flatMap: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 35, localhost): java.lang.NullPointerException: org.apache.spark.rdd.RDD.init(RDD.scala:125) com.datastax.spark.connector.rdd.CassandraRDD.init(CassandraRDD.scala:49) com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83) com.datastax.spark.connector.rdd.CassandraRDD.select(CassandraRDD.scala:143) My understanding is that I cannot produce an RDD (Cassandra results) inside another RDD. But how should I approach the problem instead? Thanks, -- Greg -- Regards, Madhukara Phatak http://www.madhukaraphatak.com -- Greg -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: While Loop
Can you tell us the problem you're facing ? Please see this thread: http://search-hadoop.com/m/JW1q5SsB5m Cheers On Fri, Jan 23, 2015 at 9:02 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Is there a better programming construct than while loop in Spark? Thank You
While Loop
Hi, Is there a better programming construct than while loop in Spark? Thank You
Re: Large number of pyspark.daemon processes
Yarn only has the ability to kill not checkpoint or sig suspend. If you use too much memory it will simply kill tasks based upon the yarn config. https://issues.apache.org/jira/browse/YARN-2172 On Friday, January 23, 2015, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Sven, What version of Spark are you running? Recent versions have a change that allows PySpark to share a pool of processes instead of starting a new one for each task. -Sandy On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com javascript:_e(%7B%7D,'cvml','kras...@gmail.com'); wrote: Hey all, I am running into a problem where YARN kills containers for being over their memory allocation (which is about 8G for executors plus 6G for overhead), and I noticed that in those containers there are tons of pyspark.daemon processes hogging memory. Here's a snippet from a container with 97 pyspark.daemon processes. The total sum of RSS usage across all of these is 1,764,956 pages (i.e. 6.7GB on the system). Any ideas what's happening here and how I can get the number of pyspark.daemon processes back to a more reasonable count? 2015-01-23 15:36:53,654 INFO [Reporter] yarn.YarnAllocationHandler (Logging.scala:logInfo(59)) - Container marked as failed: container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: Container [pid=35211,containerID=container_1421692415636_0052_01_30] is running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing container. Dump of the process-tree for container_1421692415636_0052_01_30 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m pyspark.daemon |- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m pyspark.daemon |- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m pyspark.daemon [...] Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c Thank you! -Sven -- krasser http://sites.google.com/site/krasser/?utm_source=sig
Re: Aggregations based on sort order
I'm not sure about this, but I suspect the answer is: spark doesn't guarantee a stable sort, nor does it plan to in the future, so the implementation has more flexibility. But you might be interested in the work being done on secondary sort, which could give you the guarantees you want: https://issues.apache.org/jira/browse/SPARK-3655 On Jan 19, 2015 4:52 PM, justin.uang justin.u...@gmail.com wrote: Hi, I am trying to aggregate a key based on some timestamp, and I believe that spilling to disk is changing the order of the data fed into the combiner. I have some timeseries data that is of the form: (key, date, other data) Partition 1 (A, 2, ...) (B, 4, ...) (A, 1, ...) (A, 3, ...) (B, 6, ...) which I then partition by key, then sort within the partition: Partition 1 (A, 1, ...) (A, 2, ...) (A, 3, ...) (A, 4, ...) Partition 2 (B, 4, ...) (B, 6, ...) If I run a combineByKey with the same partitioner, then the items for each key will be fed into the ExternalAppendOnlyMap in the correct order. However, if I spill, then the time slices are spilled to disk as multiple partial combiners. When its time to merge the spilled combiners for each key, the combiners are combined in the wrong order. For example, if during a groupByKey, [(A, 1, ...), (A, 2...)] and [(A, 3, ...), (A, 4, ...)] are spilled separately, it's possible that the combiners can be combined in the wrong order, like [(A, 3, ...), (A, 4, ...), (A, 1, ...), (A, 2, ...)], which invalidates the invariant that all the values for A are passed in order to the combiners. I'm not an expert, but I suspect that this is because we use a heap ordered by key when iterating, which doesn't retain the order the spilled combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index), where spill_index is incremented each time we spill? This would mean that we would pop and merge the combiners of each key in order, resulting in [(A, 1, ...), (A, 2, ...), (A, 3, ...), (A, 4, ...)]. Thanks in advance for the help! If there is a way to do this already in Spark 1.2, can someone point it out to me? Best, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Aggregations-based-on-sort-order-tp21245.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Starting a spark streaming app in init.d
I'd do the same but put an extra condition to check whether the job has successfully started or not by checking the application ui (port availability 4040 would do, if you want more complex one then write a parser for the same.) after putting the main script on sleep for some time (say 2 minutes). Thanks Best Regards On Sat, Jan 24, 2015 at 1:57 AM, Ashic Mahtab as...@live.com wrote: Hello, I'm trying to kick off a spark streaming job to a stand alone master using spark submit inside of init.d. This is what I have: DAEMON=spark-submit --class Streamer --executor-memory 500M --total-executor-cores 4 /path/to/assembly.jar start() { $DAEMON -p /var/run/my_assembly.pid echo OK return 0 } However, will return 0 even if spark_submit fails. Is there a way to run spark-submit in the background and return 0 only if it successfully starts up? Or better yet, is there something in spark-submit that will allow me to do this, perhaps via a command line argument? Thanks, Ashic.
Re: Large number of pyspark.daemon processes
Hey Adam, I'm not sure I understand just yet what you have in mind. My takeaway from the logs is that the container actually was above its allotment of about 14G. Since 6G of that are for overhead, I assumed there to be plenty of space for Python workers, but there seem to be more of those than I'd expect. Does anyone know if that is actually the intended behavior, i.e. in this case over 90 Python processes on a 2 core executor? Best, -Sven On Fri, Jan 23, 2015 at 10:04 PM, Adam Diaz adam.h.d...@gmail.com wrote: Yarn only has the ability to kill not checkpoint or sig suspend. If you use too much memory it will simply kill tasks based upon the yarn config. https://issues.apache.org/jira/browse/YARN-2172 On Friday, January 23, 2015, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Sven, What version of Spark are you running? Recent versions have a change that allows PySpark to share a pool of processes instead of starting a new one for each task. -Sandy On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote: Hey all, I am running into a problem where YARN kills containers for being over their memory allocation (which is about 8G for executors plus 6G for overhead), and I noticed that in those containers there are tons of pyspark.daemon processes hogging memory. Here's a snippet from a container with 97 pyspark.daemon processes. The total sum of RSS usage across all of these is 1,764,956 pages (i.e. 6.7GB on the system). Any ideas what's happening here and how I can get the number of pyspark.daemon processes back to a more reasonable count? 2015-01-23 15:36:53,654 INFO [Reporter] yarn.YarnAllocationHandler (Logging.scala:logInfo(59)) - Container marked as failed: container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: Container [pid=35211,containerID=container_1421692415636_0052_01_30] is running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing container. Dump of the process-tree for container_1421692415636_0052_01_30 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m pyspark.daemon |- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m pyspark.daemon |- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m pyspark.daemon [...] Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c Thank you! -Sven -- krasser http://sites.google.com/site/krasser/?utm_source=sig -- http://sites.google.com/site/krasser/?utm_source=sig
Re: Large number of pyspark.daemon processes
It should be a bug, the Python worker did not exit normally, could you file a JIRA for this? Also, could you show how to reproduce this behavior? On Fri, Jan 23, 2015 at 11:45 PM, Sven Krasser kras...@gmail.com wrote: Hey Adam, I'm not sure I understand just yet what you have in mind. My takeaway from the logs is that the container actually was above its allotment of about 14G. Since 6G of that are for overhead, I assumed there to be plenty of space for Python workers, but there seem to be more of those than I'd expect. Does anyone know if that is actually the intended behavior, i.e. in this case over 90 Python processes on a 2 core executor? Best, -Sven On Fri, Jan 23, 2015 at 10:04 PM, Adam Diaz adam.h.d...@gmail.com wrote: Yarn only has the ability to kill not checkpoint or sig suspend. If you use too much memory it will simply kill tasks based upon the yarn config. https://issues.apache.org/jira/browse/YARN-2172 On Friday, January 23, 2015, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Sven, What version of Spark are you running? Recent versions have a change that allows PySpark to share a pool of processes instead of starting a new one for each task. -Sandy On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote: Hey all, I am running into a problem where YARN kills containers for being over their memory allocation (which is about 8G for executors plus 6G for overhead), and I noticed that in those containers there are tons of pyspark.daemon processes hogging memory. Here's a snippet from a container with 97 pyspark.daemon processes. The total sum of RSS usage across all of these is 1,764,956 pages (i.e. 6.7GB on the system). Any ideas what's happening here and how I can get the number of pyspark.daemon processes back to a more reasonable count? 2015-01-23 15:36:53,654 INFO [Reporter] yarn.YarnAllocationHandler (Logging.scala:logInfo(59)) - Container marked as failed: container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: Container [pid=35211,containerID=container_1421692415636_0052_01_30] is running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing container. Dump of the process-tree for container_1421692415636_0052_01_30 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m pyspark.daemon |- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m pyspark.daemon |- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m pyspark.daemon [...] Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c Thank you! -Sven -- krasser -- http://sites.google.com/site/krasser/?utm_source=sig - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problems saving a large RDD (1 TB) to S3 as a sequence file
Can you also try increasing the akka framesize? .set(spark.akka.frameSize,50) // Set it to a higher number Thanks Best Regards On Sat, Jan 24, 2015 at 3:58 AM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: Thanks for the ideas Sven. I'm using stand-alone cluster (Spark 1.2). FWIW, I was able to get this running (just now). This is the first time it's worked in probably my last 10 attempts. In addition to limiting the executors to only 50% of the cluster. In the settings below, I additionally added/changed the following. Maybe, I just got lucky (although I think not). Would be good if someone could weigh in and agree that these changes are sensible. I'm also hoping the support for placement groups (targeted for 1.3 in the ec2 scripts) will help the situation. All in all, it takes about 45 minutes to write a 1 TB file back to S3 (as 1024 partitions). SparkConf conf = new SparkConf() .setAppName(SparkSync Application) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.rdd.compress,true) .set(spark.core.connection.ack.wait.timeout,600) .set(spark.akka.timeout,600)// Increased from 300 .set(spark.akka.threads,16) // Added so that default was increased from 4 to 16 .set(spark.task.maxFailures,64) // Didn't really matter as I had no failures in this run .set(spark.storage.blockManagerSlaveTimeoutMs,30); From: Sven Krasser kras...@gmail.com To: Darin McBeath ddmcbe...@yahoo.com Cc: User user@spark.apache.org Sent: Friday, January 23, 2015 5:12 PM Subject: Re: Problems saving a large RDD (1 TB) to S3 as a sequence file Hey Darin, Are you running this over EMR or as a standalone cluster? I've had occasional success in similar cases by digging through all executor logs and trying to find exceptions that are not caused by the application shutdown (but the logs remain my main pain point with Spark). That aside, another explanation could be S3 throttling you due to volume (and hence causing write requests to fail). You can try to split your file into multiple pieces and store those as S3 objects with different prefixes to make sure they end up in different partitions in S3. See here for details: http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html. If that works, that'll narrow the cause down. Best, -Sven On Fri, Jan 23, 2015 at 12:04 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: I've tried various ideas, but I'm really just shooting in the dark. I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024 partitions) I'm trying to save off to S3 is approximately 1TB in size (with the partitions pretty evenly distributed in size). I just tried a test to dial back the number of executors on my cluster from using the entire cluster (256 cores) down to 128. Things seemed to get a bit farther (maybe) before the wheels started spinning off again. But, the job always fails when all I'm trying to do is save the 1TB file to S3. I see the following in my master log file. 15/01/23 19:01:54 WARN master.Master: Removing worker-20150123172316 because we got no heartbeat in 60 seconds 15/01/23 19:01:54 INFO master.Master: Removing worker worker-20150123172316 on 15/01/23 19:01:54 INFO master.Master: Telling app of lost executor: 3 For the stage that eventually fails, I see the following summary information. Summary Metrics for 729 Completed Tasks Duration 2.5 min 4.8 min 5.5 min 6.3 min 9.2 min GC Time 0 ms 0.3 s 0.4 s 0.5 s 5 s Shuffle Read (Remote) 309.3 MB 321.7 MB 325.4 MB 329.6 MB 350.6 MB So, the max GC was only 5s for 729 completed tasks. This sounds reasonable. As people tend to indicate GC is the reason one loses executors, this does not appear to be my case. Here is a typical snapshot for some completed tasks. So, you can see that they tend to complete in approximately 6 minutes. So, it takes about 6 minutes to write one partition to S3 (a partition being roughly 1 GB) 65 23619 0 SUCCESS ANY 5 / 2015/01/23 18:30:32 5.8 min 0.9 s 344.6 MB 59 23613 0 SUCCESS ANY 7 / 2015/01/23 18:30:32 6.0 min 0.4 s 324.1 MB 68 23622 0 SUCCESS ANY 1 / 2015/01/23 18:30:32 5.7 min 0.5 s 329.9 MB 62 23616 0 SUCCESS ANY 6 / 2015/01/23 18:30:32 5.8 min 0.7 s 326.4 MB 61 23615 0 SUCCESS ANY 3 / 2015/01/23 18:30:32 5.5 min 1 s 335.7 MB 64 23618 0 SUCCESS ANY 2 / 2015/01/23 18:30:32 5.6 min 2 s 328.1 MB Then towards the end, when things start heading south, I see the following. These tasks never complete but you can see that they have taken more than 47 minutes (so far) before the job finally fails. Not
Re: Large number of pyspark.daemon processes
Hey Sandy, I'm using Spark 1.2.0. I assume you're referring to worker reuse? In this case I've already set spark.python.worker.reuse to false (but it I also so this behavior when keeping it enabled). Best, -Sven On Fri, Jan 23, 2015 at 4:51 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Sven, What version of Spark are you running? Recent versions have a change that allows PySpark to share a pool of processes instead of starting a new one for each task. -Sandy On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote: Hey all, I am running into a problem where YARN kills containers for being over their memory allocation (which is about 8G for executors plus 6G for overhead), and I noticed that in those containers there are tons of pyspark.daemon processes hogging memory. Here's a snippet from a container with 97 pyspark.daemon processes. The total sum of RSS usage across all of these is 1,764,956 pages (i.e. 6.7GB on the system). Any ideas what's happening here and how I can get the number of pyspark.daemon processes back to a more reasonable count? 2015-01-23 15:36:53,654 INFO [Reporter] yarn.YarnAllocationHandler (Logging.scala:logInfo(59)) - Container marked as failed: container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: Container [pid=35211,containerID=container_1421692415636_0052_01_30] is running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing container. Dump of the process-tree for container_1421692415636_0052_01_30 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 54101 36625 36625 35211 (python) 78 1 332730368 16834 python -m pyspark.daemon |- 52140 36625 36625 35211 (python) 58 1 332730368 16837 python -m pyspark.daemon |- 36625 35228 36625 35211 (python) 65 604 331685888 17694 python -m pyspark.daemon [...] Full output here: https://gist.github.com/skrasser/e3e2ee8dede5ef6b082c Thank you! -Sven -- http://sites.google.com/site/krasser/?utm_source=sig -- http://sites.google.com/site/krasser/?utm_source=sig
Re: Spark Streaming action not triggered with Kafka inputs
ssc.union will return a DStream, you should do something like: val lines = ssc.union(parallelInputs) lines.print() Thanks Best Regards On Sat, Jan 24, 2015 at 12:55 AM, Chen Song chen.song...@gmail.com wrote: I am running into some problems with Spark Streaming when reading from Kafka.I used Spark 1.2.0 built on CDH5. The example is based on: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala * It works with default implementation. val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) * However, when I changed it to parallel receiving, like shown below val topicMap = topics.split(,).map((_, 1)).toMap val parallelInputs = (1 to numThreads.toInt) map { _ = KafkaUtils .createStream(ssc, zkQuorum, group, topicMap) } ssc.union(parallelInputs) After the change, the job stage just hang there and never finish. It looks like no action is triggered on the streaming job. When I check the Streaming tab, it show messages below: Batch Processing Statistics No statistics have been generated yet. Am I doing anything wrong on the parallel receiving part? -- Chen Song
Re: Installing Spark Standalone to a Cluster
Which variable is that you don't understand? Here's a minimalistic spark-env.sh of mine. export SPARK_MASTER_IP=192.168.10.28 export HADOOP_CONF_DIR=/home/akhld/sigmoid/localcluster/hadoop/conf export HADOOP_HOME=/home/akhld/sigmoid/localcluster/hadoop/ Thanks Best Regards On Fri, Jan 23, 2015 at 11:50 PM, riginos samarasrigi...@gmail.com wrote: I need someone to send me a snapshot of his /conf/spark-env.sh file cause i don't understand how to set some vars like SPARK_MASTER etc -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-Standalone-to-a-Cluster-tp21341.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark 1.1.0 save data to hdfs failed
After I changed the dependency to the following: dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version1.2.1/version exclusions exclusion groupIdorg.mortbay.jetty/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdjavax.servlet/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdio.netty/groupId artifactIdnetty/artifactId /exclusion /exclusions /dependency I got the following error. Any idea on this? Thanks. ===Caused by: java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35) at org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832) at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103) at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala) ... 6 moreFrom: eyc...@hotmail.com To: so...@cloudera.com CC: yuzhih...@gmail.com; user@spark.apache.org Subject: RE: spark 1.1.0 save data to hdfs failed Date: Thu, 22 Jan 2015 17:05:26 -0800 Thanks. But after I replace the maven dependence from dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version2.5.0-cdh5.2.0/version scopeprovided/scope exclusions exclusion groupIdorg.mortbay.jetty/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdjavax.servlet/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdio.netty/groupId artifactIdnetty/artifactId /exclusion /exclusions/dependency todependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version1.0.4/version scopeprovided/scope exclusions exclusion groupIdorg.mortbay.jetty/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdjavax.servlet/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdio.netty/groupId artifactIdnetty/artifactId /exclusion /exclusions /dependency the warning message is still shown up in the namenode log. Is there any other thing I need to do? Thanks. Ey-Chih Chow From: so...@cloudera.com Date: Thu, 22 Jan 2015 22:34:22 + Subject: Re:
Re: processing large dataset
This is kinda a how-long-is-a-piece-of-string question. There is no one tuning for 'terabytes of data'. You can easily run a Spark job that processes hundreds of terabytes with no problem with defaults -- something trivial like counting. You can create Spark jobs that will never complete -- trying to pull the entire data set into a worker. You haven't said what you're doing exactly, although it sounds simple, and haven't said what the problem is? is it out of memory? that would be essential to know to say what if anything you need to change in your program or cluster. On Fri, Jan 23, 2015 at 4:52 AM, Kane Kim kane.ist...@gmail.com wrote: I'm trying to process 5TB of data, not doing anything fancy, just map/filter and reduceByKey. Spent whole day today trying to get it processed, but never succeeded. I've tried to deploy to ec2 with the script provided with spark on pretty beefy machines (100 r3.2xlarge nodes). Really frustrated that spark doesn't work out of the box for anything bigger than word count sample. One big problem is that defaults are not suitable for processing big datasets, provided ec2 script could do a better job, knowing instance type requested. Second it takes hours to figure out what is wrong, when spark job fails almost finished processing. Even after raising all limits as per https://spark.apache.org/docs/latest/tuning.html it still fails (now with: error communicating with MapOutputTracker). After all I have only one question - how to get spark tuned up for processing terabytes of data and is there a way to make this configuration easier and more transparent? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: save a histogram to a file
Hi, histogram method return normal scala types not a RDD. So you will not have saveAsTextFile. You can use makeRDD method make a rdd out of the data and saveAsObject file val hist = a.histogram(10) val histRDD = sc.makeRDD(hist) histRDD.saveAsObjectFile(path) On Fri, Jan 23, 2015 at 5:37 AM, SK skrishna...@gmail.com wrote: Hi, histogram() returns an object that is a pair of Arrays. There appears to be no saveAsTextFile() for this paired object. Currently I am using the following to save the output to a file: val hist = a.histogram(10) val arr1 = sc.parallelize(hist._1).saveAsTextFile(file1) val arr2 = sc.parallelize(hist._2).saveAsTextFile(file2) Is there a simpler way to save the histogram() result to a file? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-a-histogram-to-a-file-tp21324.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
Re: spark 1.1.0 save data to hdfs failed
So, you should not depend on Hadoop artifacts unless you use them directly. You should mark Hadoop and Spark deps as provided. Then the cluster's version is used at runtime with spark-submit. That's the usual way to do it, which works. If you need to embed Spark in your app and are running it outside the cluster for some reason, and you have to embed Hadoop and Spark code in your app, the version has to match. You should also use mvn dependency:tree to see all the dependencies coming in. There may be many sources of a Hadoop dep. On Fri, Jan 23, 2015 at 1:05 AM, ey-chih chow eyc...@hotmail.com wrote: Thanks. But after I replace the maven dependence from dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version2.5.0-cdh5.2.0/version scopeprovided/scope exclusions exclusion groupIdorg.mortbay.jetty/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdjavax.servlet/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdio.netty/groupId artifactIdnetty/artifactId /exclusion /exclusions /dependency to dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version1.0.4/version scopeprovided/scope exclusions exclusion groupIdorg.mortbay.jetty/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdjavax.servlet/groupId artifactIdservlet-api/artifactId /exclusion exclusion groupIdio.netty/groupId artifactIdnetty/artifactId /exclusion /exclusions /dependency the warning message is still shown up in the namenode log. Is there any other thing I need to do? Thanks. Ey-Chih Chow - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Got java.lang.SecurityException: class javax.servlet.FilterRegistration's when running job from intellij Idea
Hi, I've exactly the same issue. I've tried to mark the libraries as 'provided' but then IntelliJ IDE seems to have deleted the libraries locallythat is I am not able to build/run the stuff in the IDE. Is the issue resolved ? I'm not very experienced in SBTI've tried to exclude the libraries: /name := SparkDemo version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.2.0 exclude(org.apache.hadoop, hadoop-client) libraryDependencies += org.apache.spark % spark-sql_2.10 % 1.2.0 libraryDependencies += org.apache.hadoop % hadoop-common % 2.6.0 excludeAll( ExclusionRule(organization = org.eclipse.jetty)) libraryDependencies += org.apache.hadoop % hadoop-mapreduce-client-core % 2.6.0 libraryDependencies += org.apache.hbase % hbase-client % 0.98.4-hadoop2 libraryDependencies += org.apache.hbase % hbase-server % 0.98.4-hadoop2 libraryDependencies += org.apache.hbase % hbase-common % 0.98.4-hadoop2 mainClass in Compile := Some(demo.TruckEvents)/ but this does not work also. Thanks, Marco -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Got-java-lang-SecurityException-class-javax-servlet-FilterRegistration-s-when-running-job-from-intela-tp18035p21332.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Got java.lang.SecurityException: class javax.servlet.FilterRegistration's when running job from intellij Idea
Use mvn dependency:tree or sbt dependency-tree to print all of the dependencies. You are probably bringing in more servlet API libs from some other source? On Fri, Jan 23, 2015 at 10:57 AM, Marco marco@gmail.com wrote: Hi, I've exactly the same issue. I've tried to mark the libraries as 'provided' but then IntelliJ IDE seems to have deleted the libraries locallythat is I am not able to build/run the stuff in the IDE. Is the issue resolved ? I'm not very experienced in SBTI've tried to exclude the libraries: /name := SparkDemo version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.2.0 exclude(org.apache.hadoop, hadoop-client) libraryDependencies += org.apache.spark % spark-sql_2.10 % 1.2.0 libraryDependencies += org.apache.hadoop % hadoop-common % 2.6.0 excludeAll( ExclusionRule(organization = org.eclipse.jetty)) libraryDependencies += org.apache.hadoop % hadoop-mapreduce-client-core % 2.6.0 libraryDependencies += org.apache.hbase % hbase-client % 0.98.4-hadoop2 libraryDependencies += org.apache.hbase % hbase-server % 0.98.4-hadoop2 libraryDependencies += org.apache.hbase % hbase-common % 0.98.4-hadoop2 mainClass in Compile := Some(demo.TruckEvents)/ but this does not work also. Thanks, Marco -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Got-java-lang-SecurityException-class-javax-servlet-FilterRegistration-s-when-running-job-from-intela-tp18035p21332.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
Below is code I have written. I am getting NotSerializableException. How can I handle this scenario? kafkaStream.foreachRDD(rdd = { println() rdd.foreachPartition(partitionOfRecords = { partitionOfRecords.foreach( record = { //Write for CSV. if (true == true) { val structType = table.schema val csvFile = ssc.sparkContext.textFile(record.toString()) val rowRDD = csvFile.map(x = getMappedRowFromCsvRecord(structType, x)) } }) -- Regards, Nishant
Re: save a histogram to a file
As you can see, the result of histogram() is a pair of arrays, since of course it's small. It's not necessary and in fact is huge overkill to make it back into an RDD so you can save it across a bunch of partitions. This isn't a job for Spark, but simple Scala code. Off the top of my head (maybe not 100% right): import java.io.PrintWriter val PrintWriter out = new PrintWriter(histogram.csv) startCount = hist._1.zip(hist._2).foreach { case (start, count) = out.println(start + , count) } out.close() On Fri, Jan 23, 2015 at 12:07 AM, SK skrishna...@gmail.com wrote: Hi, histogram() returns an object that is a pair of Arrays. There appears to be no saveAsTextFile() for this paired object. Currently I am using the following to save the output to a file: val hist = a.histogram(10) val arr1 = sc.parallelize(hist._1).saveAsTextFile(file1) val arr2 = sc.parallelize(hist._2).saveAsTextFile(file2) Is there a simpler way to save the histogram() result to a file? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-a-histogram-to-a-file-tp21324.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: reducing number of output files
It does not necessarily shuffle, yes. I believe it will not if you are strictly reducing the number of partitions, and do not force a shuffle. So I think the answer is 'yes'. If you have a huge number of small files, you can also consider wholeTextFiles, which gives you entire files of content in each element of the RDD. It is not necessarily helpful, but thought I'd mention it, as it could be of interest depending on what you do. On Fri, Jan 23, 2015 at 2:14 AM, Kane Kim kane.ist...@gmail.com wrote: Does it avoid reshuffling? I have 300 thousands output files. If I coalesce to the number of cores in the cluster would it keep data local? (I have 100 nodes, 4 cores each, does it mean if I coalesce(400) it will use all cores and data will stay local)? On Thu, Jan 22, 2015 at 3:26 PM, Sean Owen so...@cloudera.com wrote: One output file is produced per partition. If you want fewer, use coalesce() before saving the RDD. On Thu, Jan 22, 2015 at 10:46 PM, Kane Kim kane.ist...@gmail.com wrote: How I can reduce number of output files? Is there a parameter to saveAsTextFile? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
Heh, this question keeps coming up. You can't use a context or RDD inside a distributed operation, only from the driver. Here you're trying to call textFile from within foreachPartition. On Fri, Jan 23, 2015 at 10:59 AM, Nishant Patel nishant.k.pa...@gmail.com wrote: Below is code I have written. I am getting NotSerializableException. How can I handle this scenario? kafkaStream.foreachRDD(rdd = { println() rdd.foreachPartition(partitionOfRecords = { partitionOfRecords.foreach( record = { //Write for CSV. if (true == true) { val structType = table.schema val csvFile = ssc.sparkContext.textFile(record.toString()) val rowRDD = csvFile.map(x = getMappedRowFromCsvRecord(structType, x)) } }) -- Regards, Nishant - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark 1.1.0 save data to hdfs failed
I looked into the source code of SparkHadoopMapReduceUtil.scala. I think it is broken in the following code: def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {val klass = firstAvailableClass( org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl, // hadoop2, hadoop2-yarnorg.apache.hadoop.mapreduce.TaskAttemptContext) // hadoop1val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] } In other words, it is related to hadoop2, hadoop2-yarn, and hadoop1. Any suggestion how to resolve it? Thanks. From: so...@cloudera.com Date: Fri, 23 Jan 2015 14:01:45 + Subject: Re: spark 1.1.0 save data to hdfs failed To: eyc...@hotmail.com CC: user@spark.apache.org These are all definitely symptoms of mixing incompatible versions of libraries. I'm not suggesting you haven't excluded Spark / Hadoop, but, this is not the only way Hadoop deps get into your app. See my suggestion about investigating the dependency tree. On Fri, Jan 23, 2015 at 1:53 PM, ey-chih chow eyc...@hotmail.com wrote: Thanks. But I think I already mark all the Spark and Hadoop reps as provided. Why the cluster's version is not used? Any way, as I mentioned in the previous message, after changing the hadoop-client to version 1.2.1 in my maven deps, I already pass the exception and go to another one as indicated below. Any suggestion on this? = Exception in thread main java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35) at org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832) at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103) at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala) ... 6 more
Can't access nested types with sql
I try to work with nested parquet data. To read and write the parquet file is actually working now but when I try to query a nested field with SqlContext I get an exception: RuntimeException: Can't access nested field in type ArrayType(StructType(List(StructField(... I generate the parquet file by parsing the data into the following caseclass structure: case class areas(area : String, dates : Seq[Int]) case class dataset(userid : Long, source : Int, days : Seq[Int] , areas : Seq[areas] ) automatic generated schema: root |-- userid: long (nullable = false) |-- source: integer (nullable = false) |-- days: array (nullable = true) ||-- element: integer (containsNull = false) |-- areas: array (nullable = true) ||-- element: struct (containsNull = true) |||-- area: string (nullable = true) |||-- dates: array (nullable = true) ||||-- element: integer (containsNull = false) After writeing the Parquetfile I load the data again and I create a SQLContext and try to execute a sql-command like follows: parquetFile.registerTempTable(testtable) val result = sqlContext.sql(SELECT areas.area FROM testtable where userid 50) result.map(t = t(0)).collect().foreach(println) // throw the exception If I execute this command: val result = sqlContext.sql(SELECT areas[0].area FROM testtable where userid 50) I get only the values at the first position in the array but I need every value and that doesn't work. I sow the function t.getAs[...] but everything what I tried didn't worked. I hope somebody can help me how I can access a nested field that I read all values of the nested array or isn't it supported? I use spark-sql_2.10(v1.2.0), spark-core_2.10(v1.2.0) and parquet 1.6.0rc4. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-nested-types-with-sql-tp21336.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GroupBy multiple attributes
Hello, I am trying to do a groupBy on 5 attributes to get results in a form like a pivot table in microsoft excel. The keys are the attribute tuples and values are double arrays(maybe very large). Based on the code below, I am getting back correct results, but would like to optimize it further(I played around with numPartitions). The two issues I see are - 1. flatMap is needed to expand the key tuples, but this also duplicates the values, and as the values are large this increases the shuffle input size for reduceByKey - is there a way to avoid the duplication? 2. reduceByKey is adding two arrays element wise, and creates a new array for every addition, is there a way to reduce by not creating a new array everytime(Similar to what accumulators do)? I am pasting a sample code, query plan and output below. Thanks. val attributeToFloatArrayRDD = sc.parallelize(Array( (A-1, B-2, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray , (A-2, B-1, C-1, D-2, E-1) - (5.0 to 1005.0 by 0.25).toArray , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray , (A-3, B-3, C-1, D-1, E-2) - (0.0 to 1000.0 by 0.25).toArray , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray , (A-4, B-3, C-1, D-1, E-1) - (8.0 to 1008.0 by 0.25).toArray , (A-1, B-1, C-1, D-1, E-1) - (0.0 to 1000.0 by 0.25).toArray )) val groupToVaRRDD = attributeToFloatArrayRDD .flatMap(x = x._1 match { case (t1, t2, t3, t4, t5) = Array((t1+_top), (t1, t2), (t1, t2, t3), (t1, t2, t3, t4), (t1, t2, t3, t4, t5)).map(y = (y, x._2)) }) .reduceByKey((x, y) = { require(x.size == y.size) (x,y).zipped.map(_ + _) }) .map(x = { (x._1, x._2.sorted.take(x._2.size/20).last) }) Query Plan (16) MappedRDD[12] at map at GroupByTest.scala:81 [] | ShuffledRDD[11] at reduceByKey at GroupByTest.scala:76 [] +-(16) FlatMappedRDD[10] at flatMap at GroupByTest.scala:68 [] | ParallelCollectionRDD[9] at parallelize at GroupByTest.scala:56 [] Output GroupBy VaR (A-2,B-1) 54.75 (A-2,B-1,C-1,D-2) 54.75 (A-1,B-1) 149.25 (A-1,B-1,C-1,D-1,E-1) 149.25 (A-3,B-3,C-1) 49.75 (A-3,B-3) 49.75 (A-4,B-3,C-1,D-1,E-1) 57.75 (A-2,B-1,C-1) 54.75 (A-1,B-2,C-1,D-1,E-1) 49.75 (A-1,B-1,C-1,D-1) 149.25 (A-3,B-3,C-1,D-1,E-2) 49.75 (A-1,B-2,C-1) 49.75 (A-3,B-3,C-1,D-1) 49.75 (A-4,B-3) 57.75 (A-1,B-1,C-1) 149.25 A-1_top 199.0 (A-4,B-3,C-1,D-1) 57.75 A-2_top 54.75 (A-1,B-2) 49.75 (A-4,B-3,C-1) 57.75 A-3_top 49.75 A-4_top 57.75 (A-2,B-1,C-1,D-2,E-1) 54.75 (A-1,B-2,C-1,D-1) 49.75
RE: spark 1.1.0 save data to hdfs failed
I also think the code is not robust enough. First, Spark works with hadoop1, why the code try hadoop2 first. Also the following code only handle ClassNotFoundException. It should handle all the exceptions. private def firstAvailableClass(first: String, second: String): Class[_] = { try { Class.forName(first)} catch { case e: ClassNotFoundException =Class.forName(second)} } From: eyc...@hotmail.com To: so...@cloudera.com CC: user@spark.apache.org Subject: RE: spark 1.1.0 save data to hdfs failed Date: Fri, 23 Jan 2015 06:43:00 -0800 I looked into the source code of SparkHadoopMapReduceUtil.scala. I think it is broken in the following code: def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {val klass = firstAvailableClass( org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl, // hadoop2, hadoop2-yarnorg.apache.hadoop.mapreduce.TaskAttemptContext) // hadoop1val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] } In other words, it is related to hadoop2, hadoop2-yarn, and hadoop1. Any suggestion how to resolve it? Thanks. From: so...@cloudera.com Date: Fri, 23 Jan 2015 14:01:45 + Subject: Re: spark 1.1.0 save data to hdfs failed To: eyc...@hotmail.com CC: user@spark.apache.org These are all definitely symptoms of mixing incompatible versions of libraries. I'm not suggesting you haven't excluded Spark / Hadoop, but, this is not the only way Hadoop deps get into your app. See my suggestion about investigating the dependency tree. On Fri, Jan 23, 2015 at 1:53 PM, ey-chih chow eyc...@hotmail.com wrote: Thanks. But I think I already mark all the Spark and Hadoop reps as provided. Why the cluster's version is not used? Any way, as I mentioned in the previous message, after changing the hadoop-client to version 1.2.1 in my maven deps, I already pass the exception and go to another one as indicated below. Any suggestion on this? = Exception in thread main java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:73) at org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil$class.newTaskAttemptContext(SparkHadoopMapReduceUtil.scala:35) at org.apache.spark.rdd.PairRDDFunctions.newTaskAttemptContext(PairRDDFunctions.scala:53) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:932) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832) at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:103) at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala) ... 6 more
Re: How to make spark partition sticky, i.e. stay with node?
I found a workaround. I can make my auxiliary data a RDD. Partition it and cache it. Later, I can cogroup it with other RDDs and Spark will try to keep the cached RDD partitions where they are and not shuffle them. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-spark-partition-sticky-i-e-stay-with-node-tp21322p21338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org