Re: Some tasks are taking long time
Thanks RK. I can turn on speculative execution but I am trying to find out actual reason for delay as it happens on any node. Any idea about the stack trace in my previous mail. Regards,Ajay On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote: If you don't want a few slow tasks to slow down the entire job, you can turn on speculation. Here are the speculation settings from Spark Configuration - Spark 1.2.0 Documentation. | | | | | | | | | Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark Properties Dynamically Loading Spark Properties Viewing Spark Properties Available Properties Application Properties Runtime Environment Shuffle Behavior Spark UI | | | | View on spark.apache.org | Preview by Yahoo | | | | | | spark.speculation | false | If set to true, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. | | spark.speculation.interval | 100 | How often Spark will check for tasks to speculate, in milliseconds. | | spark.speculation.quantile | 0.75 | Percentage of tasks which must be complete before speculation is enabled for a particular stage. | | spark.speculation.multiplier | 1.5 | How many times slower a task is than the median to be considered for speculation. | On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID wrote: Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 nid=0x3f85 waiting on condition [0x7fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7fd0aee82e00 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) Any inputs/suggestions to improve job time will be appreciated. Regards,Ajay
Re: small error in the docs?
Yes that's a typo. The API docs and source code are correct though. http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions That and your IDE should show the correct signature. You can open a PR to fix the typo in https://spark.apache.org/docs/latest/programming-guide.html On Thu, Jan 15, 2015 at 4:43 PM, kirillfish k.rybac...@datacentric.ru wrote: cogroup() function seems to return (K, (IterableV, IterableW)), rather than (K, IterableV, IterableW), as it is pointed out in the docs (at least for version 1.1.0): https://spark.apache.org/docs/1.1.0/programming-guide.html This simple discrepancy costed to me half a day of debug and frustration. Kirill - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running beyond memory limits in ConnectedComponents
Replying to all Is this Overhead memory allocation used for any specific purpose. For example, will it be any different if I do *--executor-memory 22G *with overhead set to 0%(hypothetically) vs *--executor-memory 20G* and overhead memory to default(9%) which eventually brings the total memory asked by Spark to approximately 22G. On Thu, Jan 15, 2015 at 12:54 PM, Nitin kak nitinkak...@gmail.com wrote: Is this Overhead memory allocation used for any specific purpose. For example, will it be any different if I do *--executor-memory 22G *with overhead set to 0%(hypothetically) vs *--executor-memory 20G* and overhead memory to default(9%) which eventually brings the total memory asked by Spark to approximately 22G. On Thu, Jan 15, 2015 at 12:10 PM, Sean Owen so...@cloudera.com wrote: This is a YARN setting. It just controls how much any container can reserve, including Spark executors. That is not the problem. You need Spark to ask for more memory from YARN, on top of the memory that is requested by --executor-memory. Your output indicates the default of 7% is too little. For example you can ask for 20GB for executors and ask for 2GB of overhead. Spark will ask for 22GB from YARN. (Of course, YARN needs to be set to allow containers of at least 22GB!) On Thu, Jan 15, 2015 at 4:31 PM, Nitin kak nitinkak...@gmail.com wrote: Thanks for sticking to this thread. I am guessing what memory my app requests and what Yarn requests on my part should be same and is determined by the value of *--executor-memory* which I had set to *20G*. Or can the two values be different? I checked in Yarn configurations(below), so I think that fits well into the memory overhead limits. Container Memory Maximum yarn.scheduler.maximum-allocation-mb MiBGiB Reset to the default value: 64 GiB http://10.1.1.49:7180/cmf/services/108/config# Override Instances http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue= The largest amount of physical memory, in MiB, that can be requested for a container. On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Those settings aren't relevant, I think. You're concerned with what your app requests, and what Spark requests of YARN on your behalf. (Of course, you can't request more than what your cluster allows for a YARN container for example, but that doesn't seem to be what is happening here.) You do not want to omit --executor-memory if you need large executor memory heaps, since then you just request the default and that is evidently not enough memory for your app. Look at http://spark.apache.org/docs/latest/running-on-yarn.html and spark.yarn.executor.memoryOverhead By default it's 7% of your 20G or about 1.4G. You might set this higher to 2G to give more overhead. See the --config property=value syntax documented in http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote: Thanks Sean. I guess Cloudera Manager has parameters executor_total_max_heapsize and worker_max_heapsize which point to the parameters you mentioned above. How much should that cushon between the jvm heap size and yarn memory limit be? I tried setting jvm memory to 20g and yarn to 24g, but it gave the same error as above. Then, I removed the --executor-memory clause spark-submit --class ConnectedComponentsTest --master yarn-cluster --num-executors 7 --executor-cores 1 target/scala-2.10/connectedcomponentstest_2.10-1.0.jar That is not giving GC, Out of memory exception 15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x362d65d4, /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Object.clone(Native Method) at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22) at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45) at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57) at akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43) at akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at
Is spark suitable for large scale pagerank, such as 200 million nodes, 2 billion edges?
Hi, I am run PageRank on a large dataset, which include 200 million nodes and 2 billion edges? Isspark suitable for large scale pagerank? How many cores and MEM do I need and how long will it take? Thanks Xuewei Tang
Re: Testing if an RDD is empty?
You can also check rdd.partitions.size. This will be 0 for empty RDDs and 0 for RDDs with data. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Testing-if-an-RDD-is-empty-tp1678p21170.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark crashes on second or third call first() on file
What's the version of Spark you are using? On Wed, Jan 14, 2015 at 12:00 AM, Linda Terlouw linda.terl...@icris.nl wrote: I'm new to Spark. When I use the Movie Lens dataset 100k (http://grouplens.org/datasets/movielens/), Spark crashes when I run the following code. The first call to movieData.first() gives the correct result. Depending on which machine I use, it crashed the second or third time. Does anybody know why? When I use scala in the Spark shell, this does not happen. Scala gives the correct result every time. import sys sys.path.append(d:/spark/python) sys.path.append(d:/spark/python/lib/py4j-0.8.2.1-src.zip) from pyspark import SparkContext import numpy import os os.environ[SPARK_HOME] = d:/spark sc = SparkContext(appName=testapp) movieData = sc.textFile(d:/moviedata/u.data) movieData.first() movieData.first() movieData.first() Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) at java.net.SocketOutputStream.write(SocketOutputStream.java:159) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:592) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:389) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:388) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:388) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running beyond memory limits in ConnectedComponents
If you give the executor 22GB, it will run with ... -Xmx22g. If the JVM heap gets nearly full, it will almost certainly consume more than 22GB of physical memory, because the JVM needs memory for more than just heap. But in this scenario YARN was only asked for 22GB and it gets killed. This is exactly what the overhead setting is solving. The default is 7% not 9%, or 1.4GB for a 20GB executor heap, so a 2GB overhead is a bump up. It may or may not be sufficient; I just guessed. Any JVM program with X heap is going to potentially use more than X physical memory. The overhead setting attempts to account for that so that you aren't bothered setting both values. But sometimes you need to manually increase the overhead cushion if you see that YARN kills your program for using too much physical memory. That's not the same as the JVM running out of heap. On Thu, Jan 15, 2015 at 5:54 PM, Nitin kak nitinkak...@gmail.com wrote: Is this Overhead memory allocation used for any specific purpose. For example, will it be any different if I do *--executor-memory 22G *with overhead set to 0%(hypothetically) vs *--executor-memory 20G* and overhead memory to default(9%) which eventually brings the total memory asked by Spark to approximately 22G. On Thu, Jan 15, 2015 at 12:10 PM, Sean Owen so...@cloudera.com wrote: This is a YARN setting. It just controls how much any container can reserve, including Spark executors. That is not the problem. You need Spark to ask for more memory from YARN, on top of the memory that is requested by --executor-memory. Your output indicates the default of 7% is too little. For example you can ask for 20GB for executors and ask for 2GB of overhead. Spark will ask for 22GB from YARN. (Of course, YARN needs to be set to allow containers of at least 22GB!) On Thu, Jan 15, 2015 at 4:31 PM, Nitin kak nitinkak...@gmail.com wrote: Thanks for sticking to this thread. I am guessing what memory my app requests and what Yarn requests on my part should be same and is determined by the value of *--executor-memory* which I had set to *20G*. Or can the two values be different? I checked in Yarn configurations(below), so I think that fits well into the memory overhead limits. Container Memory Maximum yarn.scheduler.maximum-allocation-mb MiBGiB Reset to the default value: 64 GiB http://10.1.1.49:7180/cmf/services/108/config# Override Instances http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue= The largest amount of physical memory, in MiB, that can be requested for a container. On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Those settings aren't relevant, I think. You're concerned with what your app requests, and what Spark requests of YARN on your behalf. (Of course, you can't request more than what your cluster allows for a YARN container for example, but that doesn't seem to be what is happening here.) You do not want to omit --executor-memory if you need large executor memory heaps, since then you just request the default and that is evidently not enough memory for your app. Look at http://spark.apache.org/docs/latest/running-on-yarn.html and spark.yarn.executor.memoryOverhead By default it's 7% of your 20G or about 1.4G. You might set this higher to 2G to give more overhead. See the --config property=value syntax documented in http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote: Thanks Sean. I guess Cloudera Manager has parameters executor_total_max_heapsize and worker_max_heapsize which point to the parameters you mentioned above. How much should that cushon between the jvm heap size and yarn memory limit be? I tried setting jvm memory to 20g and yarn to 24g, but it gave the same error as above. Then, I removed the --executor-memory clause spark-submit --class ConnectedComponentsTest --master yarn-cluster --num-executors 7 --executor-cores 1 target/scala-2.10/connectedcomponentstest_2.10-1.0.jar That is not giving GC, Out of memory exception 15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x362d65d4, /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Object.clone(Native Method) at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22) at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45) at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57) at
Re: Some tasks are taking long time
Ajay, Unless we are dealing with some synchronization/conditional variable bug in Spark, try this per tuning guide: Cache Size Tuning One important configuration parameter for GC is the amount of memory that should be used for caching RDDs. By default, Spark uses 60% of the configured executor memory (spark.executor.memory) to cache RDDs. This means that 40% of memory is available for any objects created during task execution. In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of memory, lowering this value will help reduce the memory consumption. To change this to, say, 50%, you can call conf.set(spark.storage.memoryFraction, 0.5) on your SparkConf. Combined with the use of serialized caching, using a smaller cache should be sufficient to mitigate most of the garbage collection problems. In case you are interested in further tuning the Java GC, continue reading below. Complete list of tips here: https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage Cheers, - Nicos On Jan 15, 2015, at 6:49 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID wrote: Thanks RK. I can turn on speculative execution but I am trying to find out actual reason for delay as it happens on any node. Any idea about the stack trace in my previous mail. Regards, Ajay On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote: If you don't want a few slow tasks to slow down the entire job, you can turn on speculation. Here are the speculation settings from Spark Configuration - Spark 1.2.0 Documentation http://spark.apache.org/docs/1.2.0/configuration.html. Spark Configuration - Spark 1.2.0 Documentation http://spark.apache.org/docs/1.2.0/configuration.htmlSpark Configuration Spark Properties Dynamically Loading Spark Properties Viewing Spark Properties Available Properties Application Properties Runtime Environment Shuffle Behavior Spark UI View on spark.apache.org http://spark.apache.org/docs/1.2.0/configuration.html Preview by Yahoo spark.speculation false If set to true, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. spark.speculation.interval100 How often Spark will check for tasks to speculate, in milliseconds. spark.speculation.quantile0.75Percentage of tasks which must be complete before speculation is enabled for a particular stage. spark.speculation.multiplier 1.5 How many times slower a task is than the median to be considered for speculation. On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID wrote: Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 nid=0x3f85 waiting on condition [0x7fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7fd0aee82e00 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
using hiveContext to select a nested Map-data-type from an AVROmodel+parquet file
Hi all, Any help on the following is very much appreciated. = Problem: On a schemaRDD read from a parquet file (data within file uses AVRO model) using the HiveContext: I can't figure out how to 'select' or use 'where' clause, to filter rows on a field that has a Map AVRO-data-type. I want to do a filtering using a given ('key' : 'value'). How could I do this? Details: * the printSchema of the loaded schemaRDD is like so: -- output snippet - |-- created: long (nullable = false) |-- audiences: map (nullable = true) ||-- key: string ||-- value: struct (valueContainsNull = false) |||-- percent: float (nullable = false) |||-- cluster: integer (nullable = false) - * I dont get a result when I try to select on a specific value of the 'audience' like so: SELECT created, audiences FROM mytable_data LATERAL VIEW explode(audiences) adtab AS adcol WHERE audiences['key']=='tg_loh' LIMIT 10 sequence of commands on the spark-shell (a different query and output) is: -- code snippet - scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) scala val parquetFile2 = hiveContext.parquetFile(/home/myuser/myparquetfile) scala parquetFile2.registerTempTable(mytable_data) scala hiveContext.cacheTable(mytable_data) scala hiveContext.sql(SELECT audiences['key'], audiences['value'] FROM mytable_data LATERAL VIEW explode(audiences) adu AS audien LIMIT 3).collect().foreach(println) -- output - [null,null] [null,null] [null,null] gives a list of nulls. I can see that there is data when I just do the following (output is truncated): -- code snippet - scala hiveContext.sql(SELECT audiences FROM mytable_data LATERAL VIEW explode(audiences) tablealias AS colalias LIMIT 1).collect().foreach(println) output -- [Map(tg_loh - [0.0,1,Map()], tg_co - [0.0,1,Map(tg_co_petrol - 0.0)], tg_wall - [0.0,1,Map(tg_wall_poi - 0.0)], ... Q1) What am I doing wrong? Q2) How can I use 'where' in the query to filter on specific values? What works: Queries with filtering, and selecting on fields that have simple AVRO data-types, such as long or string works fine. === I hope the explanation makes sense. Thanks. Best, BB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-hiveContext-to-select-a-nested-Map-data-type-from-an-AVROmodel-parquet-file-tp21168.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dockerized spark executor on mesos?
The AMPLab maintains a bunch of Docker files for Spark here: https://github.com/amplab/docker-scripts Hasn't been updated since 1.0.0, but might be a good starting point. On Wed Jan 14 2015 at 12:14:13 PM Josh J joshjd...@gmail.com wrote: We have dockerized Spark Master and worker(s) separately and are using it in our dev environment. Is this setup available on github or dockerhub? On Tue, Dec 9, 2014 at 3:50 PM, Venkat Subramanian vsubr...@gmail.com wrote: We have dockerized Spark Master and worker(s) separately and are using it in our dev environment. We don't use Mesos though, running it in Standalone mode, but adding Mesos should not be that difficult I think. Regards Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dockerized-spark-executor-on-mesos-tp20276p20603.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running beyond memory limits in ConnectedComponents
This is a YARN setting. It just controls how much any container can reserve, including Spark executors. That is not the problem. You need Spark to ask for more memory from YARN, on top of the memory that is requested by --executor-memory. Your output indicates the default of 7% is too little. For example you can ask for 20GB for executors and ask for 2GB of overhead. Spark will ask for 22GB from YARN. (Of course, YARN needs to be set to allow containers of at least 22GB!) On Thu, Jan 15, 2015 at 4:31 PM, Nitin kak nitinkak...@gmail.com wrote: Thanks for sticking to this thread. I am guessing what memory my app requests and what Yarn requests on my part should be same and is determined by the value of *--executor-memory* which I had set to *20G*. Or can the two values be different? I checked in Yarn configurations(below), so I think that fits well into the memory overhead limits. Container Memory Maximum yarn.scheduler.maximum-allocation-mb MiBGiB Reset to the default value: 64 GiB http://10.1.1.49:7180/cmf/services/108/config# Override Instances http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue= The largest amount of physical memory, in MiB, that can be requested for a container. On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Those settings aren't relevant, I think. You're concerned with what your app requests, and what Spark requests of YARN on your behalf. (Of course, you can't request more than what your cluster allows for a YARN container for example, but that doesn't seem to be what is happening here.) You do not want to omit --executor-memory if you need large executor memory heaps, since then you just request the default and that is evidently not enough memory for your app. Look at http://spark.apache.org/docs/latest/running-on-yarn.html and spark.yarn.executor.memoryOverhead By default it's 7% of your 20G or about 1.4G. You might set this higher to 2G to give more overhead. See the --config property=value syntax documented in http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote: Thanks Sean. I guess Cloudera Manager has parameters executor_total_max_heapsize and worker_max_heapsize which point to the parameters you mentioned above. How much should that cushon between the jvm heap size and yarn memory limit be? I tried setting jvm memory to 20g and yarn to 24g, but it gave the same error as above. Then, I removed the --executor-memory clause spark-submit --class ConnectedComponentsTest --master yarn-cluster --num-executors 7 --executor-cores 1 target/scala-2.10/connectedcomponentstest_2.10-1.0.jar That is not giving GC, Out of memory exception 15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x362d65d4, /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Object.clone(Native Method) at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22) at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45) at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57) at akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43) at akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at
Re: SQL JSON array operations
You could try yo use hive context which bring HiveQL, it would allow you to query nested structures using LATERAL VIEW explode... On Jan 15, 2015 4:03 PM, jvuillermet jeremy.vuiller...@gmail.com wrote: let's say my json file lines looks like this {user: baz, tags : [foo, bar] } sqlContext.jsonFile(data.json) ... How could I query for user with bar tags using SQL sqlContext.sql(select user from users where tags ?contains? 'bar' ) I could simplify the request and use the returned RDD to filter on tags but I'm exploring an app where users can write their SQL queries -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dockerized spark executor on mesos?
Just throwing this out here, there is existing PR to add docker support for spark framework to launch executors with docker image. https://github.com/apache/spark/pull/3074 Hopefully this will be merged sometime. Tim On Thu, Jan 15, 2015 at 9:18 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: The AMPLab maintains a bunch of Docker files for Spark here: https://github.com/amplab/docker-scripts Hasn't been updated since 1.0.0, but might be a good starting point. On Wed Jan 14 2015 at 12:14:13 PM Josh J joshjd...@gmail.com wrote: We have dockerized Spark Master and worker(s) separately and are using it in our dev environment. Is this setup available on github or dockerhub? On Tue, Dec 9, 2014 at 3:50 PM, Venkat Subramanian vsubr...@gmail.com wrote: We have dockerized Spark Master and worker(s) separately and are using it in our dev environment. We don't use Mesos though, running it in Standalone mode, but adding Mesos should not be that difficult I think. Regards Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dockerized-spark-executor-on-mesos-tp20276p20603.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is spark suitable for large scale pagerank, such as 200 million nodes, 2 billion edges?
Have you seen http://search-hadoop.com/m/JW1q5pE3P12 ? Please also take a look at the end-to-end performance graph on http://spark.apache.org/graphx/ Cheers On Thu, Jan 15, 2015 at 9:29 AM, txw t...@outlook.com wrote: Hi, I am run PageRank on a large dataset, which include 200 million nodes and 2 billion edges? Is spark suitable for large scale pagerank? How many cores and MEM do I need and how long will it take? Thanks Xuewei Tang
SQL JSON array operations
let's say my json file lines looks like this {user: baz, tags : [foo, bar] } sqlContext.jsonFile(data.json) ... How could I query for user with bar tags using SQL sqlContext.sql(select user from users where tags ?contains? 'bar' ) I could simplify the request and use the returned RDD to filter on tags but I'm exploring an app where users can write their SQL queries -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Inserting an element in RDD[String]
thanks On Thu, Jan 15, 2015 at 7:35 PM, Prannoy [via Apache Spark User List] ml-node+s1001560n21163...@n3.nabble.com wrote: Hi, You can take the schema line in another rdd and than do a union of the two rdd . ListString schemaList = new ArrayListString; schemaList.add(xyz); // where xyz is your schema line JavaRDD schemaRDDString = sc.parallize(schemaList) ; //where sc is your sparkcontext JavaRDD newRDDString = schemaRDD.union(yourRDD); // where yourRDD is your another rdd starting of which you want to add the schema line. The code is in java, you can change it to scala Thanks. On Thu, Jan 15, 2015 at 7:46 PM, Hafiz Mujadid [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=21163i=0 wrote: hi experts! I hav an RDD[String] and i want to add schema line at beginning in this rdd. I know RDD is immutable. So is there anyway to have a new rdd with one schema line and contents of previous rdd? Thanks -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161.html To start a new topic under Apache Spark User List, email [hidden email] http:///user/SendEmail.jtp?type=nodenode=21163i=1 To unsubscribe from Apache Spark User List, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161p21163.html To unsubscribe from Inserting an element in RDD[String], click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21161code=aGFmaXptdWphZGlkMDBAZ21haWwuY29tfDIxMTYxfC05MjEzOTMxMTE= . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Regards: HAFIZ MUJADID -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161p21165.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Running beyond memory limits in ConnectedComponents
Those settings aren't relevant, I think. You're concerned with what your app requests, and what Spark requests of YARN on your behalf. (Of course, you can't request more than what your cluster allows for a YARN container for example, but that doesn't seem to be what is happening here.) You do not want to omit --executor-memory if you need large executor memory heaps, since then you just request the default and that is evidently not enough memory for your app. Look at http://spark.apache.org/docs/latest/running-on-yarn.html and spark.yarn.executor.memoryOverhead By default it's 7% of your 20G or about 1.4G. You might set this higher to 2G to give more overhead. See the --config property=value syntax documented in http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote: Thanks Sean. I guess Cloudera Manager has parameters executor_total_max_heapsize and worker_max_heapsize which point to the parameters you mentioned above. How much should that cushon between the jvm heap size and yarn memory limit be? I tried setting jvm memory to 20g and yarn to 24g, but it gave the same error as above. Then, I removed the --executor-memory clause spark-submit --class ConnectedComponentsTest --master yarn-cluster --num-executors 7 --executor-cores 1 target/scala-2.10/connectedcomponentstest_2.10-1.0.jar That is not giving GC, Out of memory exception 15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x362d65d4, /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Object.clone(Native Method) at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22) at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45) at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57) at akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43) at akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread SparkListenerBus java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.json4s.JsonDSL$class.seq2jvalue(JsonDSL.scala:68) at org.json4s.JsonDSL$.seq2jvalue(JsonDSL.scala:61) at org.apache.spark.util.JsonProtocol$$anonfun$jobStartToJson$3.apply(JsonProtocol.scala:127) at org.apache.spark.util.JsonProtocol$$anonfun$jobStartToJson$3.apply(JsonProtocol.scala:127) at org.json4s.JsonDSL$class.pair2jvalue(JsonDSL.scala:79) at org.json4s.JsonDSL$.pair2jvalue(JsonDSL.scala:61) at org.apache.spark.util.JsonProtocol$.jobStartToJson(JsonProtocol.scala:127) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:59) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:92) at
Re: Running beyond memory limits in ConnectedComponents
I am sorry for the formatting error, the value for *yarn.scheduler.maximum-allocation-mb = 28G* On Thu, Jan 15, 2015 at 11:31 AM, Nitin kak nitinkak...@gmail.com wrote: Thanks for sticking to this thread. I am guessing what memory my app requests and what Yarn requests on my part should be same and is determined by the value of *--executor-memory* which I had set to *20G*. Or can the two values be different? I checked in Yarn configurations(below), so I think that fits well into the memory overhead limits. Container Memory Maximum yarn.scheduler.maximum-allocation-mb MiBGiB Reset to the default value: 64 GiB http://10.1.1.49:7180/cmf/services/108/config# Override Instances http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue= The largest amount of physical memory, in MiB, that can be requested for a container. On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Those settings aren't relevant, I think. You're concerned with what your app requests, and what Spark requests of YARN on your behalf. (Of course, you can't request more than what your cluster allows for a YARN container for example, but that doesn't seem to be what is happening here.) You do not want to omit --executor-memory if you need large executor memory heaps, since then you just request the default and that is evidently not enough memory for your app. Look at http://spark.apache.org/docs/latest/running-on-yarn.html and spark.yarn.executor.memoryOverhead By default it's 7% of your 20G or about 1.4G. You might set this higher to 2G to give more overhead. See the --config property=value syntax documented in http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote: Thanks Sean. I guess Cloudera Manager has parameters executor_total_max_heapsize and worker_max_heapsize which point to the parameters you mentioned above. How much should that cushon between the jvm heap size and yarn memory limit be? I tried setting jvm memory to 20g and yarn to 24g, but it gave the same error as above. Then, I removed the --executor-memory clause spark-submit --class ConnectedComponentsTest --master yarn-cluster --num-executors 7 --executor-cores 1 target/scala-2.10/connectedcomponentstest_2.10-1.0.jar That is not giving GC, Out of memory exception 15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x362d65d4, /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Object.clone(Native Method) at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22) at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45) at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57) at akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43) at akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread SparkListenerBus java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at
Re: Running beyond memory limits in ConnectedComponents
Thanks for sticking to this thread. I am guessing what memory my app requests and what Yarn requests on my part should be same and is determined by the value of *--executor-memory* which I had set to *20G*. Or can the two values be different? I checked in Yarn configurations(below), so I think that fits well into the memory overhead limits. Container Memory Maximum yarn.scheduler.maximum-allocation-mb MiBGiB Reset to the default value: 64 GiB http://10.1.1.49:7180/cmf/services/108/config# Override Instances http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue= The largest amount of physical memory, in MiB, that can be requested for a container. On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Those settings aren't relevant, I think. You're concerned with what your app requests, and what Spark requests of YARN on your behalf. (Of course, you can't request more than what your cluster allows for a YARN container for example, but that doesn't seem to be what is happening here.) You do not want to omit --executor-memory if you need large executor memory heaps, since then you just request the default and that is evidently not enough memory for your app. Look at http://spark.apache.org/docs/latest/running-on-yarn.html and spark.yarn.executor.memoryOverhead By default it's 7% of your 20G or about 1.4G. You might set this higher to 2G to give more overhead. See the --config property=value syntax documented in http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote: Thanks Sean. I guess Cloudera Manager has parameters executor_total_max_heapsize and worker_max_heapsize which point to the parameters you mentioned above. How much should that cushon between the jvm heap size and yarn memory limit be? I tried setting jvm memory to 20g and yarn to 24g, but it gave the same error as above. Then, I removed the --executor-memory clause spark-submit --class ConnectedComponentsTest --master yarn-cluster --num-executors 7 --executor-cores 1 target/scala-2.10/connectedcomponentstest_2.10-1.0.jar That is not giving GC, Out of memory exception 15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x362d65d4, /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Object.clone(Native Method) at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22) at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45) at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57) at akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43) at akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread SparkListenerBus java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at
Re: save spark streaming output to single file on hdfs
Hi, You can use FileUtil.copyMerge API and specify the path to the folder where saveAsTextFile is save the part text file. Suppose your directory is /a/b/c/ use FileUtil.copyMerge(FileSystem of source, a/b/c, FileSystem of destination, Path to the merged file say (a/b/c.txt), true(to delete the original dir,null)) Thanks. On Tue, Jan 13, 2015 at 11:34 PM, jamborta [via Apache Spark User List] ml-node+s1001560n21124...@n3.nabble.com wrote: Hi all, Is there a way to save dstream RDDs to a single file so that another process can pick it up as a single RDD? It seems that each slice is saved to a separate folder, using saveAsTextFiles method. I'm using spark 1.2 with pyspark thanks, -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-tp21124.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-tp21124p21167.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
small error in the docs?
cogroup() function seems to return (K, (IterableV, IterableW)), rather than (K, IterableV, IterableW), as it is pointed out in the docs (at least for version 1.1.0): https://spark.apache.org/docs/1.1.0/programming-guide.html https://spark.apache.org/docs/1.1.0/programming-guide.html This simple discrepancy costed to me half a day of debug and frustration. Kirill
Re: Error when running SparkPi on Secure HA Hadoop cluster
You're specifying the queue in the spark-submit command line: --queue thequeue Are you sure that queue exists? On Thu, Jan 15, 2015 at 11:23 AM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, Setup is as follows Hadoop Cluster 2.3.0 (CDH5.0) - Namenode HA - Resource manager HA - Secured with Kerberos Spark 1.2 Run SparkPi as follows - conf/spark-defaults.conf has following entries spark.yarn.queue myqueue spark.yarn.access.namenodes hdfs://namespace (remember this is namenode HA) - Do kinit with some user keytab - submit SparkPi as follows spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue thequeue $MY_SPARK_DIR/lib/spark-examples*.jar 10 Gives following trace (not sure why it shows unknown queue when queue name is specified in the spark-defaults.conf above. 15/01/15 19:18:27 INFO impl.YarnClientImpl: Submitted application application_1415648563285_31469 15/01/15 19:18:28 INFO yarn.Client: Application report for application_1415648563285_31469 (state: FAILED) 15/01/15 19:18:28 INFO yarn.Client: client token: N/A diagnostics: Application application_1415648563285_31469 submitted by user XYZ to unknown queue: thequeue --- WHY UNKNOWN QUEUE ??? ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: thequeue --- WHY UNKNOWN QUEUE ??? start time: 1421349507652 final status: FAILED tracking URL: N/A user: XYZ Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:102) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:58) -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL JSON array operations
yeah that's where I ended up. Thanks ! I'll give it a try. On Thu, Jan 15, 2015 at 8:46 PM, Ayoub [via Apache Spark User List] ml-node+s1001560n21172...@n3.nabble.com wrote: You could try to use hive context which bring HiveQL, it would allow you to query nested structures using LATERAL VIEW explode... see doc https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView here -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164p21172.html To unsubscribe from SQL JSON array operations, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21164code=amVyZW15LnZ1aWxsZXJtZXRAZ21haWwuY29tfDIxMTY0fC0yMzc2NjY3MDI= . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164p21173.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Executor parameter doesn't work for Spark-shell on EMR Yarn
Hi All, I am testing Spark on EMR cluster. Env is a one node cluster r3.8xlarge. Has 32 vCore and 244G memory. But the command line I use to start up spark-shell, it can't work. For example: ~/spark/bin/spark-shell --jars /home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/*.jar --num-executors 6 --executor-memory 10G Neither num-executors nor memory setup works. And more interesting, if I use test code: val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75, -240990|161324,9051480,0,2,30.48,75)) var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum It will start 32 executors (then I assume it try to start all executors for every vCore). But if I use some real data to do it (the file size is 200M): val lines = sc.textFile(s3://.../part-r-0) var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum It will only start 4 executors, which map to the number of HDFS split (200M will have 4 splits). So I have two questions: 1, Why the setup parameter is ignored by Yarn? How can I limit the number of executors I can run? 2, Why my much smaller test data set will trigger 32 executors but my real 200M data set will only have 4 executors? So how should I control the executor setup on the spark-shell? And I print the sparkConf, it looks like much less than I expect, and I don't see my pass in parameter show there. scala sc.getConf.getAll.foreach(println) (spark.tachyonStore.folderName,spark-af0c4d42-fe4d-40b0-a3cf-25b6a9e16fa0) (spark.app.id,local-1421353031552) (spark.eventLog.enabled,true) (spark.executor.id,driver) (spark.repl.class.uri,http://10.181.82.38:58415) (spark.driver.host,ip-10-181-82-38.ec2.internal) (spark.executor.extraJavaOptions,-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70) (spark.app.name,Spark shell) (spark.fileserver.uri,http://10.181.82.38:54666) (spark.jars,file:/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/aws-java-sdk -1.9.14.jar) (spark.eventLog.dir,hdfs:///spark-logs) (spark.executor.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hado op/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hado op/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar) (spark.master,local[*]) (spark.driver.port,54191) (spark.driver.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hadoop /spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop /.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar) I search the old threads, attached email answer the question about why vCore setup doesn't work. But I think this is not same issue as me. Otherwise then default Yarn Spark setup can't do any adjustment? Regards, Shuai ---BeginMessage--- If you are using capacity scheduler in yarn: By default yarn capacity scheduler uses DefaultResourceCalculator. DefaultResourceCalculator consider¹s only memory while allocating contains. You can use DominantResourceCalculator, it considers memory and cpu. In capacity-scheduler.xml set yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.res ource.DefaultResourceCalculator On 04/11/14 3:03 am, Gen gen.tan...@gmail.com wrote: Hi, Well, I doesn't find original documentation, but according to http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage , the vcores is not for physics cpu core but for virtual cores. And I used top command to monitor the cpu utilization during the spark task. The spark can use all cpu even I leave --executor-cores as default(1). Hope that it can be a help. Cheers Gen Gen wrote Hi, Maybe it is a stupid question, but I am running spark on yarn. I request the resources by the following command: {code} ./spark-submit --master yarn-client --num-executors #number of worker --executor-cores #number of cores. ... {code} However, after launching the task, I use / yarn node -status ID / to monitor the situation of cluster. It shows that the number of Vcores used for each container is always 1 no matter what number I pass by --executor-cores. Any ideas how to solve this problem? Thanks a lot in advance for your help. Cheers Gen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/executor-cores-cannot- change-vcores-in-yarn-tp17883p17992.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org ---End Message---
Re: SQL JSON array operations
You could try to use hive context which bring HiveQL, it would allow you to query nested structures using LATERAL VIEW explode... see doc https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView here -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164p21172.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Executor vs Mapper in Hadoop
Hi All, I try to clarify some behavior in the spark for executor. Because I am from Hadoop background, so I try to compare it to the Mapper (or reducer) in hadoop. 1, Each node can have multiple executors, each run in its own process? This is same as mapper process. 2, I thought the spark executor will use multi-thread mode when there are more than 1 core to allocate to it (for example: set executor-cores to 5). In this way, how many partition it can process? For example, if input are 20 partitions (similar as 20 split as mapper input) and we have 5 executors, each has 4 cores. Will all these partitions will be proceed as the same time (so each core process one partition) or actually one executor can only run one partition at the same time? I don't know whether my understand is correct, please suggest. BTW: In general practice, should we always try to set the executor-cores to a higher number? So we will favor 10 cores * 2 executor than 2 cores*10 executors? Any suggestion here? Thanks! Regards, Shuai
How to force parallel processing of RDD using multiple thread
I have a standalone spark cluster with only one node with 4 CPU cores. How can I force spark to do parallel processing of my RDD using multiple threads? For example I can do the following Spark-submit --master local[4] However I really want to use the cluster as follow Spark-submit --master spark://10.125.21.15:7070 In that case, how can I make sure the RDD is processed with multiple threads/cores? Thanks Ningjun
Re: Testing if an RDD is empty?
I think Sampo's thought is to get a function that only tests if a RDD is empty. He does not want to know the size of the RDD, and getting the size of a RDD is expensive for large data sets. I myself saw many times that my app threw out exceptions because an empty RDD cannot be saved. This is not big issue, but annoying. Having a cheap solution testing if an RDD is empty would be nice if there is no such thing available now. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Testing-if-an-RDD-is-empty-tp1678p21175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: different akka versions and spark
and to make things even more interesting: The CDH *5.3* version of Spark 1.2 differs from the Apache Spark 1.2 release in using Akka version 2.2.3, the version used by Spark 1.1 and CDH 5.2. Apache Spark 1.2 uses Akka version 2.3.4. so i just compiled a program that uses akka against apache spark 1.2.0, and it indeed does not run on CDH5.3.0. i get class incompatibility errors. On Tue, Jan 6, 2015 at 10:29 AM, Koert Kuipers ko...@tresata.com wrote: if the classes are in the original location than i think its safe to say that this makes it impossible for us to build one app that can run against spark 1.0.x, 1.1.x and spark 1.2.x. thats no big deal, but it does beg the question of what compatibility can reasonably be expected for spark 1.x series. i have seen a lot of focus on backwards compatibility of the spark 1.x api, but to me thats kind of a moot point if i cannot run apps against all 1.x versions anyhow since dependencies are not compatible. On Mon, Jan 5, 2015 at 5:08 PM, Marcelo Vanzin van...@cloudera.com wrote: Spark doesn't really shade akka; it pulls a different build (kept under the org.spark-project.akka group and, I assume, with some build-time differences from upstream akka?), but all classes are still in the original location. The upgrade is a little more unfortunate than just changing akka, since it also changes some transitive dependencies which also have compatibility issues (e.g. the typesafe config library). But I believe it's needed to support Scala 2.11... On Mon, Jan 5, 2015 at 8:27 AM, Koert Kuipers ko...@tresata.com wrote: since spark shaded akka i wonder if it would work, but i doubt it On Mon, Jan 5, 2015 at 9:56 AM, Cody Koeninger c...@koeninger.org wrote: I haven't tried it with spark specifically, but I've definitely run into problems trying to depend on multiple versions of akka in one project. On Sat, Jan 3, 2015 at 11:22 AM, Koert Kuipers ko...@tresata.com wrote: hey Ted, i am aware of the upgrade efforts for akka. however if spark 1.2 forces me to upgrade all our usage of akka to 2.3.x while spark 1.0 and 1.1 force me to use akka 2.2.x then we cannot build one application that runs on all spark 1.x versions, which i would consider a major incompatibility. best, koert On Sat, Jan 3, 2015 at 12:11 AM, Ted Yu yuzhih...@gmail.com wrote: Please see http://akka.io/news/2014/05/22/akka-2.3.3-released.html which points to http://doc.akka.io/docs/akka/2.3.3/project/migration-guide-2.2.x-2.3.x.html?_ga=1.35212129.1385865413.1420220234 Cheers On Fri, Jan 2, 2015 at 9:11 AM, Koert Kuipers ko...@tresata.com wrote: i noticed spark 1.2.0 bumps the akka version. since spark uses it's own akka version, does this mean it can co-exist with another akka version in the same JVM? has anyone tried this? we have some spark apps that also use akka (2.2.3) and spray. if different akka versions causes conflicts then spark 1.2.0 would not be backwards compatible for us... thanks. koert -- Marcelo
Re: save spark streaming output to single file on hdfs
thanks for the replies. very useful. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-tp21124p21176.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark streaming python files not packaged in assembly jar
Hi all, just discovered that the streaming folder in pyspark is not included in the assembly jar (spark-assembly-1.2.0-hadoop2.3.0.jar), but included in the python folder. Any reason why? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-python-files-not-packaged-in-assembly-jar-tp21177.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Executor parameter doesn't work for Spark-shell on EMR Yarn
I figure out the second question, because if I don't pass in the num of partition for the test data, it will by default assume has max executors (although I don't know what is this default max num). val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75, -240990|161324,9051480,0,2,30.48,75),2) will only trigger 2 executors. So I think the default executors num will be decided by the first RDD operation need to send to executors. This give me a weird way to control the num of executors (a fake/test code piece run to kick off the executors first, then run the real behavior - because executor will run the whole lifecycle of the applications? Although this may not have any real value in practice J But I still need help for my first question. Thanks a lot. Regards, Shuai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Thursday, January 15, 2015 4:03 PM To: user@spark.apache.org Subject: RE: Executor parameter doesn't work for Spark-shell on EMR Yarn Forget to mention, I use EMR AMI 3.3.1, Spark 1.2.0. Yarn 2.4. The spark is setup by the standard script: s3://support.elasticmapreduce/spark/install-spark From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Thursday, January 15, 2015 3:52 PM To: user@spark.apache.org Subject: Executor parameter doesn't work for Spark-shell on EMR Yarn Hi All, I am testing Spark on EMR cluster. Env is a one node cluster r3.8xlarge. Has 32 vCore and 244G memory. But the command line I use to start up spark-shell, it can't work. For example: ~/spark/bin/spark-shell --jars /home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/*.jar --num-executors 6 --executor-memory 10G Neither num-executors nor memory setup works. And more interesting, if I use test code: val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75, -240990|161324,9051480,0,2,30.48,75)) var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum It will start 32 executors (then I assume it try to start all executors for every vCore). But if I use some real data to do it (the file size is 200M): val lines = sc.textFile(s3://.../part-r-0) var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum It will only start 4 executors, which map to the number of HDFS split (200M will have 4 splits). So I have two questions: 1, Why the setup parameter is ignored by Yarn? How can I limit the number of executors I can run? 2, Why my much smaller test data set will trigger 32 executors but my real 200M data set will only have 4 executors? So how should I control the executor setup on the spark-shell? And I print the sparkConf, it looks like much less than I expect, and I don't see my pass in parameter show there. scala sc.getConf.getAll.foreach(println) (spark.tachyonStore.folderName,spark-af0c4d42-fe4d-40b0-a3cf-25b6a9e16fa0) (spark.app.id,local-1421353031552) (spark.eventLog.enabled,true) (spark.executor.id,driver) (spark.repl.class.uri,http://10.181.82.38:58415) (spark.driver.host,ip-10-181-82-38.ec2.internal) (spark.executor.extraJavaOptions,-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70) (spark.app.name,Spark shell) (spark.fileserver.uri,http://10.181.82.38:54666) (spark.jars,file:/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/aws-java-sdk -1.9.14.jar) (spark.eventLog.dir,hdfs:///spark-logs) (spark.executor.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hado op/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hado op/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar) (spark.master,local[*]) (spark.driver.port,54191) (spark.driver.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hadoop /spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop /.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar) I search the old threads, attached email answer the question about why vCore setup doesn't work. But I think this is not same issue as me. Otherwise then default Yarn Spark setup can't do any adjustment? Regards, Shuai
RE: using hiveContext to select a nested Map-data-type from an AVROmodel+parquet file
Hi, BB Ideally you can do the query like: select key, value.percent from mytable_data lateral view explode(audiences) f as key, value limit 3; But there is a bug in HiveContext: https://issues.apache.org/jira/browse/SPARK-5237 I am working on it now, hopefully make a patch soon. Cheng Hao -Original Message- From: BB [mailto:bagme...@gmail.com] Sent: Friday, January 16, 2015 12:52 AM To: user@spark.apache.org Subject: using hiveContext to select a nested Map-data-type from an AVROmodel+parquet file Hi all, Any help on the following is very much appreciated. = Problem: On a schemaRDD read from a parquet file (data within file uses AVRO model) using the HiveContext: I can't figure out how to 'select' or use 'where' clause, to filter rows on a field that has a Map AVRO-data-type. I want to do a filtering using a given ('key' : 'value'). How could I do this? Details: * the printSchema of the loaded schemaRDD is like so: -- output snippet - |-- created: long (nullable = false) |-- audiences: map (nullable = true) ||-- key: string ||-- value: struct (valueContainsNull = false) |||-- percent: float (nullable = false) |||-- cluster: integer (nullable = false) - * I dont get a result when I try to select on a specific value of the 'audience' like so: SELECT created, audiences FROM mytable_data LATERAL VIEW explode(audiences) adtab AS adcol WHERE audiences['key']=='tg_loh' LIMIT 10 sequence of commands on the spark-shell (a different query and output) is: -- code snippet - scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) scala val parquetFile2 = hiveContext.parquetFile(/home/myuser/myparquetfile) scala parquetFile2.registerTempTable(mytable_data) scala hiveContext.cacheTable(mytable_data) scala hiveContext.sql(SELECT audiences['key'], audiences['value'] scala FROM mytable_data LATERAL VIEW explode(audiences) adu AS audien LIMIT 3).collect().foreach(println) -- output - [null,null] [null,null] [null,null] gives a list of nulls. I can see that there is data when I just do the following (output is truncated): -- code snippet - scala hiveContext.sql(SELECT audiences FROM mytable_data LATERAL VIEW explode(audiences) tablealias AS colalias LIMIT 1).collect().foreach(println) output -- [Map(tg_loh - [0.0,1,Map()], tg_co - [0.0,1,Map(tg_co_petrol - 0.0)], tg_wall - [0.0,1,Map(tg_wall_poi - 0.0)], ... Q1) What am I doing wrong? Q2) How can I use 'where' in the query to filter on specific values? What works: Queries with filtering, and selecting on fields that have simple AVRO data-types, such as long or string works fine. === I hope the explanation makes sense. Thanks. Best, BB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-hiveContext-to-select-a-nested-Map-data-type-from-an-AVROmodel-parquet-file-tp21168.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
I have seen this happen when the RDD contains null values. Essentially, saveAsTextFile calls toString() on the elements of the RDD, so a call to null.toString will result in an NPE. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp20951p21178.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Anyway to make RDD preserve input directories structures?
say there's some logs: s3://log-collections/sys1/20141212/nginx.gz s3://log-collections/sys1/20141213/nginx-part-1.gz s3://log-collections/sys1/20141213/nginx-part-2.gz I have a function that parse the logs for later analysis. I want to parse all the files. So I do this: logs = sc.textFile('s3://log-collections/sys1/') logs.map(parse).saveAsTextFile('s3://parsed-logs/') BUT, this will destroy the date separate naming shema.resulting: s3://parsed-logs/part- s3://parsed-logs/part-0001 ... And the worse part is that when I got a new day logs. It seems rdd.saveAsTextFile couldn't just append the new day's log. So I create a RDD for every single file.and parse it, save to the name I want.like this: one = sc.textFile(s3://log-collections/sys1/20141213/nginx-part-1.gz) one.map(parse).saveAsTextFile(s3://parsed-logs/20141213/01/) which resulting: s3://parsed-logs/20141212/01/part- s3://parsed-logs/20141213/01/part- s3://parsed-logs/20141213/01/part-0001 s3://parsed-logs/20141213/02/part- s3://parsed-logs/20141213/02/part-0001 s3://parsed-logs/20141213/02/part-0002 And when a new day's log comes. I just process that day's logs and put to the proper directory(or key) THE PROBLEM is this way I have to create a seperated RDD for every single file. which couldn't take advantage of Spark's functionality of automatic parallel processing.[I'm trying to submit multi applications for each batch of files.] Or maybe I'd better use hadoop streaming for this ? Any suggestions?
Re: Testing if an RDD is empty?
Hi, On Fri, Jan 16, 2015 at 7:31 AM, freedafeng freedaf...@yahoo.com wrote: I myself saw many times that my app threw out exceptions because an empty RDD cannot be saved. This is not big issue, but annoying. Having a cheap solution testing if an RDD is empty would be nice if there is no such thing available now. I think the cheapest you can have is computing at least one element in the RDD, which in the case of, say, val maybeEmptyRDD = veryExpensiveRDD.filter(false) will be just as expensive as .count(). Tobias
Re: Executor vs Mapper in Hadoop
An executor is specific to a Spark application, just as a mapper is specific to a MapReduce job. So a machine will usually be running many executors, and each is a JVM. A Mapper is single-threaded; an executor can run many tasks (possibly from different jobs within the application) at once. Yes, 5 executors with 4 cores should be able to process 20 tasks in parallel. In any normal case, you have 1 executor per machine per application. There are cases where you would make more than 1, but these are unusual. On Thu, Jan 15, 2015 at 8:16 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I try to clarify some behavior in the spark for executor. Because I am from Hadoop background, so I try to compare it to the Mapper (or reducer) in hadoop. 1, Each node can have multiple executors, each run in its own process? This is same as mapper process. 2, I thought the spark executor will use multi-thread mode when there are more than 1 core to allocate to it (for example: set executor-cores to 5). In this way, how many partition it can process? For example, if input are 20 partitions (similar as 20 split as mapper input) and we have 5 executors, each has 4 cores. Will all these partitions will be proceed as the same time (so each core process one partition) or actually one executor can only run one partition at the same time? I don’t know whether my understand is correct, please suggest. BTW: In general practice, should we always try to set the executor-cores to a higher number? So we will favor 10 cores * 2 executor than 2 cores*10 executors? Any suggestion here? Thanks! Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Testing if an RDD is empty?
How about checking whether take(1).length == 0? If I read the code correctly, this will only examine the first partition, at least. On Fri, Jan 16, 2015 at 4:12 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Jan 16, 2015 at 7:31 AM, freedafeng freedaf...@yahoo.com wrote: I myself saw many times that my app threw out exceptions because an empty RDD cannot be saved. This is not big issue, but annoying. Having a cheap solution testing if an RDD is empty would be nice if there is no such thing available now. I think the cheapest you can have is computing at least one element in the RDD, which in the case of, say, val maybeEmptyRDD = veryExpensiveRDD.filter(false) will be just as expensive as .count(). Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accumulators
You're understanding is basically correct. Each task creates it's own local accumulator, and just those results get merged together. However, there are some performance limitations to be aware of. First you need enough memory on the executors to build up whatever those intermediate results are. Second, all the work of *merging* the results from each task are done by the *driver*. So if there is a lot of stuff to merge, that can be slow, as its not distributed at all. Hope that helps a little Imran On Jan 14, 2015 6:21 PM, Corey Nolet cjno...@gmail.com wrote: Just noticed an error in my wording. Should be I'm assuming it's not immediately aggregating on the driver each time I call the += on the Accumulator. On Wed, Jan 14, 2015 at 9:19 PM, Corey Nolet cjno...@gmail.com wrote: What are the limitations of using Accumulators to get a union of a bunch of small sets? Let's say I have an RDD[Map{String,Any} and i want to do: rdd.map(accumulator += Set(_.get(entityType).get)) What implication does this have on performance? I'm assuming it's not immediately aggregating each time I call the += on the Accumulator. Is it doing a local combine and then occasionally sending the results on the current partition back to the driver?
Re: Some tasks are taking long time
Thanks Nicos.GC does not contribute much to the execution time of the task. I will debug it further today. Regards,Ajay On Thursday, January 15, 2015 11:55 PM, Nicos n...@hotmail.com wrote: Ajay, Unless we are dealing with some synchronization/conditional variable bug in Spark, try this per tuning guide:Cache Size TuningOne important configuration parameter for GC is the amount of memory that should be used for caching RDDs. By default, Spark uses 60% of the configured executor memory (spark.executor.memory) to cache RDDs. This means that 40% of memory is available for any objects created during task execution.In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of memory, lowering this value will help reduce the memory consumption. To change this to, say, 50%, you can call conf.set(spark.storage.memoryFraction, 0.5) on your SparkConf. Combined with the use of serialized caching, using a smaller cache should be sufficient to mitigate most of the garbage collection problems. In case you are interested in further tuning the Java GC, continue reading below. Complete list of tips here:https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage Cheers,- Nicos On Jan 15, 2015, at 6:49 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID wrote: Thanks RK. I can turn on speculative execution but I am trying to find out actual reason for delay as it happens on any node. Any idea about the stack trace in my previous mail. Regards,Ajay On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote: If you don't want a few slow tasks to slow down the entire job, you can turn on speculation. Here are the speculation settings from Spark Configuration - Spark 1.2.0 Documentation. | | | | | | | | | Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark Properties Dynamically Loading Spark Properties Viewing Spark Properties Available Properties Application Properties Runtime Environment Shuffle Behavior Spark UI | | | | View on spark.apache.org | Preview by Yahoo | | | | | | spark.speculation | false | If set to true, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. | | spark.speculation.interval | 100 | How often Spark will check for tasks to speculate, in milliseconds. | | spark.speculation.quantile | 0.75 | Percentage of tasks which must be complete before speculation is enabled for a particular stage. | | spark.speculation.multiplier | 1.5 | How many times slower a task is than the median to be considered for speculation. | On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID wrote: Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 nid=0x3f85 waiting on condition [0x7fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7fd0aee82e00 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at
OutOfMemory error in Spark Core
We have our Analytics App built on Spark 1.1 Core, Parquet, Avro and Spray. We are using Kryo serializer for the Avro objects read from Parquet and we are using our custom Kryo registrator (along the lines of ADAM https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala#L51 , we just added batched writes and flushes to Kryo's Output for each 512 MB in the stream, as below outstream.array.sliding(512MB).foreach(buf = { kryoOut.write(buf) kryoOut.flush() }) ) Our queries are done to a cached RDD(MEMORY_ONLY), that is obtained after 1. loading bulk data from Parquet 2. union-ing it with incremental data in Avro 3. doing timestamp based duplicate removal (including partitioning in reduceByKey) and 4. joining a couple of MySQL tables using JdbcRdd Of late, we are seeing major instabilities where the app crashes on a lost executor which itself failed due to a OutOfMemory error as below. This looks almost identical to https://issues.apache.org/jira/browse/SPARK-4885 even though we are seeing this error in Spark 1.1 2015-01-15 20:12:51,653 [handle-message-executor-13] ERROR org.apache.spark.executor.ExecutorUncaughtExceptionHandler - Uncaught exception in thread Thread[handle-message-executor-13,5,main] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.require(Output.java:135) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) at com.esotericsoftware.kryo.io.Output.write(Output.java:183) at com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:31) at com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:30) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:30) at com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:18) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1047) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1056) at org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:154) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:421) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:387) at org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) The driver log is as below 15/01/15 12:12:53 ERROR scheduler.DAGSchedulerActorSupervisor: eventProcesserActor failed; shutting down SparkContext java.util.NoSuchElementException: key not found: 2539 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) at
Spark SQL Custom Predicate Pushdown
I have document storage services in Accumulo that I'd like to expose to Spark SQL. I am able to push down predicate logic to Accumulo to have it perform only the seeks necessary on each tablet server to grab the results being asked for. I'm interested in using Spark SQL to push those predicates down to the tablet servers. Where wouldI begin my implementation? Currently I have an input format which accepts a query object that gets pushed down. How would I extract this information from the HiveContext/SQLContext to be able to push this down?
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OutOfMemory error in Spark Core
Did you try increasing the parallelism? Thanks Best Regards On Fri, Jan 16, 2015 at 10:41 AM, Anand Mohan chinn...@gmail.com wrote: We have our Analytics App built on Spark 1.1 Core, Parquet, Avro and Spray. We are using Kryo serializer for the Avro objects read from Parquet and we are using our custom Kryo registrator (along the lines of ADAM https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala#L51 , we just added batched writes and flushes to Kryo's Output for each 512 MB in the stream, as below outstream.array.sliding(512MB).foreach(buf = { kryoOut.write(buf) kryoOut.flush() }) ) Our queries are done to a cached RDD(MEMORY_ONLY), that is obtained after 1. loading bulk data from Parquet 2. union-ing it with incremental data in Avro 3. doing timestamp based duplicate removal (including partitioning in reduceByKey) and 4. joining a couple of MySQL tables using JdbcRdd Of late, we are seeing major instabilities where the app crashes on a lost executor which itself failed due to a OutOfMemory error as below. This looks almost identical to https://issues.apache.org/jira/browse/SPARK-4885 even though we are seeing this error in Spark 1.1 2015-01-15 20:12:51,653 [handle-message-executor-13] ERROR org.apache.spark.executor.ExecutorUncaughtExceptionHandler - Uncaught exception in thread Thread[handle-message-executor-13,5,main] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.require(Output.java:135) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) at com.esotericsoftware.kryo.io.Output.write(Output.java:183) at com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:31) at com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:30) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:30) at com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:18) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1047) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1056) at org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:154) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:421) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:387) at org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) The driver log is as below 15/01/15 12:12:53 ERROR scheduler.DAGSchedulerActorSupervisor: eventProcesserActor failed; shutting down SparkContext java.util.NoSuchElementException: key not found: 2539 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:799)
Re: How to query Spark master for cluster status ?
There's a JSON end point in the Web UI ( that running on port 8080), http://masterip:8080/json/ Thanks Best Regards On Thu, Jan 15, 2015 at 6:30 PM, Shing Hing Man mat...@yahoo.com.invalid wrote: Hi, I am using Spark 1.2. The Spark master UI has a status. Is there a web service on the Spark master that returns the status of the cluster in Json ? Alternatively, what is the best way to determine if a cluster is up. Thanks in advance for your assistance! Shing
Re: PySpark saveAsTextFile gzip
You can use the saveAsNewAPIHadoop http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html#saveAsNewAPIHadoopFile file. You can use it for compressing your output, here's a sample code https://github.com/ScrapCodes/spark-1/blob/master/python/pyspark/tests.py#L1225 to use the API. Thanks Best Regards On Thu, Jan 15, 2015 at 5:16 PM, Tom Seddon mr.tom.sed...@gmail.com wrote: Hi, I've searched but can't seem to find a PySpark example. How do I write compressed text file output to S3 using PySpark saveAsTextFile? Thanks, Tom
Re: Low Level Kafka Consumer for Spark
Hi Kidong, Just now I tested the Low Level Consumer with Spark 1.2 and I did not see any issue with Receiver.Store method . It is able to fetch messages form Kafka. Can you cross check other configurations in your setup like Kafka broker IP , topic name, zk host details, consumer id etc. Dib On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark SQL Custom Predicate Pushdown
The Data Source API probably work for this purpose. It support the column pruning and the Predicate Push Down: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala Examples also can be found in the unit test: https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/sources From: Corey Nolet [mailto:cjno...@gmail.com] Sent: Friday, January 16, 2015 1:51 PM To: user Subject: Spark SQL Custom Predicate Pushdown I have document storage services in Accumulo that I'd like to expose to Spark SQL. I am able to push down predicate logic to Accumulo to have it perform only the seeks necessary on each tablet server to grab the results being asked for. I'm interested in using Spark SQL to push those predicates down to the tablet servers. Where wouldI begin my implementation? Currently I have an input format which accepts a query object that gets pushed down. How would I extract this information from the HiveContext/SQLContext to be able to push this down?
UserGroupInformation: PriviledgedActionException as
Hi, When I am trying to run a program in a remote spark machine I am getting this below exception : 15/01/16 11:14:39 ERROR UserGroupInformation: PriviledgedActionException as:user1 (auth:SIMPLE) cause:java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] Exception in thread main java.lang.reflect.UndeclaredThrowableException: Unknown exception in doAs at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1421) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:163) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.security.PrivilegedActionException: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) ... 4 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:127) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) I am executing this driver class in one machine say(abc) but the spark is installed on another machine (xyz:7077). When I execute the same driver class pointing to local it works fine. public class TestSpark implements Serializable{ public static void main(String[] args) throws IOException { TestSpark test = new TestSpark(); test.testReduce(); } public void testReduce() throws IOException { SparkConf conf = new SparkConf().setMaster(spark://xyz:7077).setAppName(Sample App); String[] pathToJar = {/home/user1/Desktop/Jars/TestSpark.jar}; //SparkConf conf = new SparkConf().setMaster(spark://abc:7077).setAppName(Sample App).setJars(pathToJar); //SparkConf conf = new SparkConf().setMaster(local).setAppName(Sample App); JavaSparkContext jsc = new JavaSparkContext(conf); ListInteger data = new ArrayListInteger(); for(int i=1;i500;i++){ data.add(i); } System.out.println(Size : +data.size()); JavaRDDInteger distData = jsc.parallelize(data); Integer total = distData.reduce(new Function2Integer, Integer, Integer() { @Override public Integer call(Integer v1, Integer v2) throws Exception { String s1 = v1 : + v1 + v2 : + v2 + -- + this; return v1 + v2; } }); System.out.println(--+total ); } } I have also tried setting spark.driver.host :: xyz spark.driver.port :: 7077 while creating the spark context, but it did not help. Please advice -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/UserGroupInformation-PriviledgedActionException-as-tp21182.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
There was a simple example https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45 which you can run after changing few lines of configurations. Thanks Best Regards On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, Just now I tested the Low Level Consumer with Spark 1.2 and I did not see any issue with Receiver.Store method . It is able to fetch messages form Kafka. Can you cross check other configurations in your setup like Kafka broker IP , topic name, zk host details, consumer id etc. Dib On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Kidong, No , I have not tried yet with Spark 1.2 yet. I will try this out and let you know how this goes. By the way, is there any change in Receiver Store method happened in Spark 1.2 ? Regards, Dibyendu On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote: Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MatchError in JsonRDD.toLong
Hi, I am experiencing a weird error that suddenly popped up in my unit tests. I have a couple of HDFS files in JSON format and my test is basically creating a JsonRDD and then issuing a very simple SQL query over it. This used to work fine, but now suddenly I get: 15:58:49.039 [Executor task launch worker-1] ERROR executor.Executor - Exception in task 1.0 in stage 29.0 (TID 117) scala.MatchError: 14452800566866169008 (of class java.math.BigInteger) at org.apache.spark.sql.json.JsonRDD$.toLong(JsonRDD.scala:282) at org.apache.spark.sql.json.JsonRDD$.enforceCorrectType(JsonRDD.scala:353) at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1$$anonfun$apply$12.apply(JsonRDD.scala:381) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1.apply(JsonRDD.scala:380) at org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1.apply(JsonRDD.scala:365) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.sql.json.JsonRDD$.org$apache$spark$sql$json$JsonRDD$$asRow(JsonRDD.scala:365) at org.apache.spark.sql.json.JsonRDD$$anonfun$jsonStringToRow$1.apply(JsonRDD.scala:38) at org.apache.spark.sql.json.JsonRDD$$anonfun$jsonStringToRow$1.apply(JsonRDD.scala:38) ... java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) The stack trace contains none of my classes, so it's a bit hard to track down where this starts. The code of JsonRDD.toLong is in fact private def toLong(value: Any): Long = { value match { case value: java.lang.Integer = value.asInstanceOf[Int].toLong case value: java.lang.Long = value.asInstanceOf[Long] } } so if value is a BigInteger, toLong doesn't work. Now I'm wondering where this comes from (I haven't touched this component in a while, nor upgraded Spark etc.), but in particular I would like to know how to work around this. Thanks Tobias
Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?
Thanks Cheng! Is there any API I can get access too (e.g. ParquetTableScan) which would allow me to load up the low level/baseRDD of just RDD[Row] so I could avoid the defensive copy (maybe lose our on columnar storage etc.). We have parts of our pipeline using SparkSQL/SchemaRDDs and others using the core RDD api (mapPartitions etc.). Any tips? Out of curiosity, a lot of SparkSQL functions seem to run in a mapPartiton (e.g. Distinct). Does a defensive copy happen there too? Keen to get the best performance and the best blend of SparkSQL and functional Spark. Cheers, Nathan From: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com Date: Monday, 12 January 2015 1:21 am To: Nathan nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au, Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats? On 1/11/15 1:40 PM, Nathan McCarthy wrote: Thanks Cheng Michael! Makes sense. Appreciate the tips! Idiomatic scala isn't performant. I’ll definitely start using while loops or tail recursive methods. I have noticed this in the spark code base. I might try turning off columnar compression (via spark.sql.inMemoryColumnarStorage.compressed=false correct?) and see how performance compares to the primitive objects. Would you expect to see similar runtimes vs the primitive objects? We do have the luxury of lots of memory at the moment so this might give us an additional performance boost. Turning off compression should be faster, but still slower than directly using primitive objects. Because Spark SQL also serializes all objects within a column into byte buffers in a compact format. However, this radically reduces number of Java objects in the heap and is more GC friendly. When running large queries, cost introduced by GC can be significant. Regarding the defensive copying of row objects. Can we switch this off and just be aware of the risks? Is MapPartitions on SchemaRDDs and operating on the Row object the most performant way to be flipping between SQL Scala user code? Is there anything else I could be doing? This can be very dangerous and error prone. Whenever an operator tries to cache row objects, turning off defensive copying can introduce wrong query result. For example, sort-based shuffle caches rows to do sorting. In some cases, sample operator may also cache row objects. This is very implementation specific and may change between versions. Cheers, ~N From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Date: Saturday, 10 January 2015 3:41 am To: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com Cc: Nathan nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats? The other thing to note here is that Spark SQL defensively copies rows when we switch into user code. This probably explains the difference between 1 2. The difference between 1 3 is likely the cost of decompressing the column buffers vs. accessing a bunch of uncompressed primitive objects. On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com wrote: Hey Nathan, Thanks for sharing, this is a very interesting post :) My comments are inlined below. Cheng On 1/7/15 11:53 AM, Nathan McCarthy wrote: Hi, I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via rdd.mapPartitions(…). Using the latest release 1.2.0. Simple example; load up some sample data from parquet on HDFS (about 380m rows, 10 columns) on a 7 node cluster. val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”) t.registerTempTable(test1”) sqlC.cacheTable(test1”) Now lets do some operations on it; I want the total sales quantities sold for each hour in the day so I choose 3 out of the 10 possible columns... sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by Hour).collect().foreach(println) After the table has been 100% cached in memory, this takes around 11 seconds. Lets do the same thing but via a MapPartitions call (this isn’t production ready code but gets the job done). val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”) rddPC.mapPartitions { case hrs = val qtySum = new Array[Double](24) val salesSum = new Array[Double](24) for(r - hrs) { val hr = r.getInt(0) qtySum(hr) += r.getDouble(1) salesSum(hr) += r.getDouble(2) } (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println) I believe the evil thing that makes this snippet much slower is the for-loop.
Re: Serializability: for vs. while loops
Scala for-loops are implemented as closures using anonymous inner classes which are instantiated once and invoked many times. This means, though, that the code inside the loop is actually sitting inside a class, which confuses Spark's Closure Cleaner, whose job is to remove unused references from closures to make otherwise-unserializable objects serializable. My understanding is, in particular, that the closure cleaner will null out unused fields in the closure, but cannot go past the first level of depth (i.e., it will not follow field references and null out *their *unused, and possibly unserializable, references), because this could end up mutating state outside of the closure itself. Thus, the extra level of depth of the closure that was introduced by the anonymous class (where presumably the outer this pointer is considered used by the closure cleaner) is sufficient to make it unserializable. While loops, on the other hand, involve none of this trickery, and everyone is happy. On Wed, Jan 14, 2015 at 11:37 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, sorry, I don't like questions about serializability myself, but still... Can anyone give me a hint why for (i - 0 to (maxId - 1)) { ... } throws a NotSerializableException in the loop body while var i = 0 while (i maxId) { // same code as in the for loop i += 1 } works fine? I guess there is something fundamentally different in the way Scala realizes for loops? Thanks Tobias
Visualize Spark Job
Hi I want to visualize tasks and stages in order to analyze spark jobs. I know necessary metrics is written in spark.eventLog.dir. Does anyone know the tool like swimlanes in Tez? Regards, Nobuyuki Kuromatsu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error connecting to localhost:8060: java.net.ConnectException: Connection refused
Hi, I'm new to Apache Storm. I'm receiving data at my UDP port 8060, I want to capture it and perform some operations in the real time, for which I'm using Spark Streaming. While the code seems to be correct, I get the following output: https://gist.github.com/d34th4ck3r/0e88896eac864d6d7193 I'm using the following command: mvn -e -Dmaven.tomcat.port=8080 tomcat:run exec:java -Dexec.mainClass=twoGrams.Main Also, $netstat -pn | grep 8060 return nothing. Hence the port is free. I think the problem may be because of the interval within which the Storm is trying to connect the port is too short, but I'm not sure how to change it and if that actually is the problem. Any help would be appreciated. Thanks, Gautam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-connecting-to-localhost-8060-java-net-ConnectException-Connection-refused-tp21148.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Fast HashSets HashMaps - Spark Collection Utils
A recent discussion says these won't be public. However there are many optimized collection libs in Java. My favorite is Koloboke: https://github.com/OpenHFT/Koloboke/wiki/Koloboke:-roll-the-collection-implementation-with-features-you-need Carrot HPPC is good too. The only catch is that the libraries are huge so you may end up using your build to chop out packages you don't need. Otherwise its 20+ MB of code. On Jan 15, 2015 4:05 AM, Night Wolf nightwolf...@gmail.com wrote: Hi all, I'd like to leverage some of the fast Spark collection implementations in my own code. Particularity for doing things like distinct counts in a mapPartitions loop. Are there any plans to make the org.apache.spark.util.collection implementations public? Is there any other library out there with similar performance? Cheers, NW
Re: ScalaReflectionException when using saveAsParquetFile in sbt
Same problem here... Did u find a solution for this? P. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ScalaReflectionException-when-using-saveAsParquetFile-in-sbt-tp21020p21150.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark fault tolerance mechanism
Hi, I'm quite interested in how Spark's fault tolerance works and I'd like to ask a question here. According to the paper, there are two kinds of dependencies--the wide dependency and the narrow dependency. My understanding is, if the operations I use are all narrow, then when one machine crashes, the system just need to recover the lost RDDs from the most recent checkpoint. However, if all transformations are wide(e.g. in calculating PageRank), then when one node crashes, all other nodes need to roll back to the most recent checkpoint. Is my understanding correct? Thanks! Best Regards, Fan
kinesis creating stream scala code exception
Hi, Expert I want to consumes data from kinesis stream using spark streaming. I am trying to create kinesis stream using scala code. Here is my code def main(args: Array[String]) { println(Stream creation started) if(create(2)) println(Stream is created successfully) } def create(shardCount: Int): Boolean = { val credentials = new BasicAWSCredentials(KinesisProperties.AWS_ACCESS_KEY_ID, KinesisProperties.AWS_SECRET_KEY) var kinesisClient: AmazonKinesisClient = new AmazonKinesisClient(credentials) kinesisClient.setEndpoint(KinesisProperties.KINESIS_ENDPOINT_URL, KinesisProperties.KINESIS_SERVICE_NAME, KinesisProperties.KINESIS_REGION_ID) val createStreamRequest = new CreateStreamRequest() createStreamRequest.setStreamName(KinesisProperties.myStreamName); createStreamRequest.setShardCount(shardCount) val describeStreamRequest = new DescribeStreamRequest() describeStreamRequest.setStreamName(KinesisProperties.myStreamName) try { Thread.sleep(12) } catch { case e: Exception = } var streamStatus = not active while (!streamStatus.equalsIgnoreCase(ACTIVE)) { try { Thread.sleep(1000) } catch { case e: Exception = e.printStackTrace() } try { val describeStreamResponse = kinesisClient.describeStream(describeStreamRequest) streamStatus = describeStreamResponse.getStreamDescription.getStreamStatus } catch { case e: Exception = e.printStackTrace() } } if (streamStatus.equalsIgnoreCase(ACTIVE)) true else false } When I run this code I get following exception Exception in thread main java.lang.NoSuchMethodError: org.joda.time.format.DateTimeFormatter.withZoneUTC()Lorg/joda/time/format/DateTimeFormatter; at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) at com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105) at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78) at com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307) at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280) at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160) at com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:139) at com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:116) at com.platalytics.platform.connectors.Kinesis.App$.create(App.scala:32) at com.platalytics.platform.connectors.Kinesis.App$.main(App.scala:26) at com.platalytics.platform.connectors.Kinesis.App.main(App.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) I have following maven dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-kinesis-asl_2.10/artifactId version1.2.0/version /dependency Any suggestion? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/kinesis-creating-stream-scala-code-exception-tp21154.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PySpark saveAsTextFile gzip
Hi, I've searched but can't seem to find a PySpark example. How do I write compressed text file output to S3 using PySpark saveAsTextFile? Thanks, Tom
Re: MissingRequirementError with spark 1.2
I am also getting the same error after 1.2 upgrade. application is crashing on this line rdd.registerTempTable(temp) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MissingRequirementError-with-spark-tp21149p21152.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MissingRequirementError with spark
I found this, which might be useful: https://github.com/deanwampler/spark-workshop/blob/master/project/Build.scala I seems that forking is needed. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MissingRequirementError-with-spark-tp21149p21153.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serializability: for vs. while loops
Aaron, thanks for your mail! On Thu, Jan 15, 2015 at 5:05 PM, Aaron Davidson ilike...@gmail.com wrote: Scala for-loops are implemented as closures using anonymous inner classes [...] While loops, on the other hand, involve none of this trickery, and everyone is happy. Ah, I was suspecting something like that... thank you very much for the detailed explanation! Tobias
Re: RowMatrix multiplication
You can always define an RDD transpose function yourself. This is what I use in PySpark to transpose an RDD of numpy vectors. It’s not optimal and the vectors need to fit in memory on the worker nodes. def rddTranspose(rdd): # add an index to the rows and the columns, result in triplet dataT1 = data.zipWithIndex().flatMap(lambda (x,i): [(i,j,e) for (j,e) in enumerate(x)]) # use the column from the original as key and group and sort dataT2 = dataT1.map(lambda (i,j,e): (j, (i,e)))\ .groupByKey().sortByKey() # Sort the lists inside the rows dataT3 = dataT2.map(lambda (i, x): sorted(list(x), cmp=lambda (i1,e1),(i2,e2): cmp(i1, i2))) # Remove the indices inside the rows dataT4 = dataT3.map(lambda x: map(lambda (i, y): y , x)) # convert to numpy arrays in the rows return dataT4.map(lambda x: np.asarray(x)) Cheers, Toni On 12 Jan 2015 at 20:45:58, Alex Minnaar (aminn...@verticalscope.com) wrote: That's not quite what I'm looking for. Let me provide an example. I have a rowmatrix A that is nxm and I have two local matrices b and c. b is mx1 and c is nx1. In my spark job I wish to perform the following two computations A*b and A^T*c I don't think this is possible without being able to transpose a rowmatrix. Am I correct? Thanks, Alex From: Reza Zadeh r...@databricks.com Sent: Monday, January 12, 2015 1:58 PM To: Alex Minnaar Cc: u...@spark.incubator.apache.org Subject: Re: RowMatrix multiplication As you mentioned, you can perform A * b, where A is a rowmatrix and b is a local matrix. From your email, I figure you want to compute b * A^T. To do this, you can compute C = A b^T, whose result is the transpose of what you were looking for, i.e. C^T = b * A^T. To undo the transpose, you would have transpose C manually yourself. Be careful though, because the result might not have each Row fit in memory on a single machine, which is what RowMatrix requires. This danger is why we didn't provide a transpose operation in RowMatrix natively. To address this and more, there is an effort to provide more comprehensive linear algebra through block matrices, which will likely make it to 1.3: https://issues.apache.org/jira/browse/SPARK-3434 Best, Reza On Mon, Jan 12, 2015 at 6:33 AM, Alex Minnaar aminn...@verticalscope.com wrote: I have a rowMatrix on which I want to perform two multiplications. The first is a right multiplication with a local matrix which is fine. But after that I also wish to right multiply the transpose of my rowMatrix with a different local matrix. I understand that there is no functionality to transpose a rowMatrix at this time but I was wondering if anyone could suggest a any kind of work-around for this. I had thought that I might be able to initially create two rowMatrices - a normal version and a transposed version - and use either when appropriate. Can anyone think of another alternative? Thanks, Alex
Re: *ByKey aggregations: performance + order
I'm interested too and don't know for sure but I do not think this case is optimized this way. However if you know your keys aren't split across partitions and you have small enough partitions you can implement the same grouping with mapPartitions and Scala. On Jan 15, 2015 1:27 AM, Tobias Pfeiffer t...@preferred.jp wrote: Sean, thanks for your message. On Wed, Jan 14, 2015 at 8:36 PM, Sean Owen so...@cloudera.com wrote: On Wed, Jan 14, 2015 at 4:53 AM, Tobias Pfeiffer t...@preferred.jp wrote: OK, it seems like even on a local machine (with no network overhead), the groupByKey version is about 5 times slower than any of the other (reduceByKey, combineByKey etc.) functions... Even without network overhead, you're still paying the cost of setting up the shuffle and serialization. Can I pick an appropriate scheduler some time before so that Spark knows all items with the same key are on the same host? (Or enforce this?) Thanks Tobias
Spark-Sql not using cluster slaves
I have been using Spark sql in cluster mode and I am noticing no distribution and parallelization of the query execution. The performance seems to be very slow compared to native spark applications and does not offer any speedup when compared to HIVE. I am using Spark 1.1.0 with a cluster of 5 nodes. The query that is being ran consists of 4 subqueries where each query creates groups based on a variable and then they are all unioned(I have attached part of the query at the end of this post). Implementing the query functionality in native spark (scala) seems to run in a normal distributed manner as expected, but executing the query on spark sql does not engage the slaves (observed from the master’s web UI). The performance is the same if I run the query with cluster of a master and 4 slaves or a single node cluster(master only). I would greatly appreciate any pointers on the cause of the issue. Query Sample: SELECT * FROM ( SELECT age AS varname, a.tile AS catname, a.myrandom count(*) AS count FROM ( SELECT *, cast((5 * rand(65535)) AS int) AS myrandom, CASE WHEN ( ( age = 19)) THEN 1 WHEN ( ( age = 35)) THEN 2 WHEN ( ( age = 51)) THEN 3 WHEN ( ( age = 63)) THEN 4 ELSE 5 END AS tile FROM tablea a) a GROUP BY a.tile, a.myrandom UNION ALL SELECT salary AS varname, a.tile AS catname, a.myrandom, count(*) AS count FROM ( SELECT *, cast((5 * rand(65535)) AS int) AS myrandom, CASE WHEN ( ( salary = 39615)) THEN 1 WHEN ( ( salary = 65740)) THEN 2 WHEN ( ( salary = 117555)) THEN 3 ELSE 4 END AS tile FROM tablea a) a GROUP BY a.tile, a.myrandom ) unioned; -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Sql-not-using-cluster-slaves-tp21155.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to query Spark master for cluster status ?
Hi, I am using Spark 1.2. The Spark master UI has a status.Is there a web service on the Spark master that returns the status of the cluster in Json ? Alternatively, what is the best way to determine if a cluster is up. Thanks in advance for your assistance! Shing
Re: kinesis creating stream scala code exception
Are you using spark in standalone mode or yarn or mesos? If its yarn, please mention the hadoop distribution and version. What spark distribution are you using (it seems 1.2.0 but compiled with which hadoop version)? Thanks, Aniket On Thu, Jan 15, 2015, 4:59 PM Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi, Expert I want to consumes data from kinesis stream using spark streaming. I am trying to create kinesis stream using scala code. Here is my code def main(args: Array[String]) { println(Stream creation started) if(create(2)) println(Stream is created successfully) } def create(shardCount: Int): Boolean = { val credentials = new BasicAWSCredentials(KinesisProperties.AWS_ACCESS_KEY_ID, KinesisProperties.AWS_SECRET_KEY) var kinesisClient: AmazonKinesisClient = new AmazonKinesisClient(credentials) kinesisClient.setEndpoint(KinesisProperties.KINESIS_ENDPOINT_URL, KinesisProperties.KINESIS_SERVICE_NAME, KinesisProperties.KINESIS_REGION_ID) val createStreamRequest = new CreateStreamRequest() createStreamRequest.setStreamName(KinesisProperties.myStreamName); createStreamRequest.setShardCount(shardCount) val describeStreamRequest = new DescribeStreamRequest() describeStreamRequest.setStreamName(KinesisProperties. myStreamName) try { Thread.sleep(12) } catch { case e: Exception = } var streamStatus = not active while (!streamStatus.equalsIgnoreCase(ACTIVE)) { try { Thread.sleep(1000) } catch { case e: Exception = e.printStackTrace() } try { val describeStreamResponse = kinesisClient.describeStream(describeStreamRequest) streamStatus = describeStreamResponse.getStreamDescription.getStreamStatus } catch { case e: Exception = e.printStackTrace() } } if (streamStatus.equalsIgnoreCase(ACTIVE)) true else false } When I run this code I get following exception Exception in thread main java.lang.NoSuchMethodError: org.joda.time.format.DateTimeFormatter.withZoneUTC()Lorg/joda/time/format/ DateTimeFormatter; at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance( NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance( DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) at com.amazonaws.auth.SignerFactory.lookupAndCreateSigner( SignerFactory.java:105) at com.amazonaws.auth.SignerFactory.getSigner( SignerFactory.java:78) at com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion( AmazonWebServiceClient.java:307) at com.amazonaws.AmazonWebServiceClient.computeSignerByURI( AmazonWebServiceClient.java:280) at com.amazonaws.AmazonWebServiceClient.setEndpoint( AmazonWebServiceClient.java:160) at com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint( AmazonKinesisClient.java:2102) at com.amazonaws.services.kinesis.AmazonKinesisClient. init(AmazonKinesisClient.java:216) at com.amazonaws.services.kinesis.AmazonKinesisClient. init(AmazonKinesisClient.java:139) at com.amazonaws.services.kinesis.AmazonKinesisClient. init(AmazonKinesisClient.java:116) at com.platalytics.platform.connectors.Kinesis.App$.create(App.scala:32) at com.platalytics.platform.connectors.Kinesis.App$.main(App.scala:26) at com.platalytics.platform.connectors.Kinesis.App.main(App.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) I have following maven dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-kinesis-asl_2.10/artifactId version1.2.0/version /dependency Any suggestion? -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/kinesis-creating-stream-scala-code- exception-tp21154.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail:
Using native blas with mllib
Hi, I'm trying to use the native blas, and I followed all the threads I saw here and I still can't get rid of those warning: WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS I'm using: Centos 6.5 java 1.7.0_71 spark 1.1.0 running on yarn cluster and the things I done so far are: - install those packeges: - lapack-devel - blas-devel - atlas-devel - openblas-devel - gcc-gfortran.x86_64 - libgfortran.x86_64 - added those dependencies to mllib pom.xml: dependency groupIdorg.scalanlp/groupId artifactIdbreeze-natives_${scala.binary.version}/artifactId version0.9/version /dependency dependency groupIdcom.github.fommil.netlib/groupId artifactIdall/artifactId version1.1.2/version typepom/type /dependency (I know that the -Pnetlib-lgpl flag shloud have take care of the netlib dependency, but for some reason it didn't work) - build spark with the command: mvn -Pyarn -Phive -Phadoop-2.5 -Pnetlib-lgpl -Dhadoop.version=2.5.0-cdh5.2.0 -DskipTests clean package (I'm building it on windows if it makes any difference) And I did the follwing checks: sudo /sbin/ldconfig -p | grep libblas.so.3 libblas.so.3 (libc6,x86-64) = /usr/lib64/libblas.so.3 sudo /sbin/ldconfig -p | grep liblapack.so.3 liblapack.so.3 (libc6,x86-64) = /usr/lib64/liblapack.so.3 liblapack.so.3 (libc6,x86-64) = /usr/lib64/atlas/liblapack.so.3 jar tf assembly\target\scala-2.10\spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar | grep netlib-native_system-linux-x86_64.so netlib-native_system-linux-x86_64.so netlib-native_system-linux-x86_64.so.asc Anything else I can try? Thanks, Lev. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-native-blas-with-mllib-tp21156.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using Spark SQL with multiple (avro) files
I've tried this now. Spark can load multiple avro files from the same directory by passing a path to a directory. However, passing multiple paths separated with commas didn't work. Is there any way to load all avro files in multiple directories using sqlContext.avroFile? On Wed, Jan 14, 2015 at 3:53 PM, David Jones letsnumsperi...@gmail.com wrote: Should I be able to pass multiple paths separated by commas? I haven't tried but didn't think it'd work. I'd expected a function that accepted a list of strings. On Wed, Jan 14, 2015 at 3:20 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: If the wildcard path you have doesn't work you should probably open a bug -- I had a similar problem with Parquet and it was a bug which recently got closed. Not sure if sqlContext.avroFile shares a codepath with .parquetFile...you can try running with bits that have the fix for .parquetFile or look at the source... Here was my question for reference: http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3ccaaswr-5rfmu-y-7htluj2eqqaecwjs8jh+irrzhm7g1ex7v...@mail.gmail.com%3E On Wed, Jan 14, 2015 at 4:34 AM, David Jones letsnumsperi...@gmail.com wrote: Hi, I have a program that loads a single avro file using spark SQL, queries it, transforms it and then outputs the data. The file is loaded with: val records = sqlContext.avroFile(filePath) val data = records.registerTempTable(data) ... Now I want to run it over tens of thousands of Avro files (all with schemas that contain the fields I'm interested in). Is it possible to load multiple avro files recursively from a top-level directory using wildcards? All my avro files are stored under s3://my-bucket/avros/*/DATE/*.avro, and I want to run my task across all of these on EMR. If that's not possible, is there some way to load multiple avro files into the same table/RDD so the whole dataset can be processed (and in that case I'd supply paths to each file concretely, but I *really* don't want to have to do that). Thanks David
Re: MissingRequirementError with spark
added fork :=true in Scala Build. Commandline sbt is working fine but Eclipse SCALA IDE is still giving same error. This was all working fine untill Spark 1.1. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MissingRequirementError-with-spark-tp21149p21159.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Inserting an element in RDD[String]
hi experts! I hav an RDD[String] and i want to add schema line at beginning in this rdd. I know RDD is immutable. So is there anyway to have a new rdd with one schema line and contents of previous rdd? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to define SparkContext with Cassandra connection for spark-jobserver?
In the spark job server* bin *folder, you will find* application.conf* file, put context-settings { spark.cassandra.connection.host = ur address } Hope this should work -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-define-SparkContext-with-Cassandra-connection-for-spark-jobserver-tp21119p21162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Some tasks are taking long time
Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 nid=0x3f85 waiting on condition [0x7fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7fd0aee82e00 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) Any inputs/suggestions to improve job time will be appreciated. Regards,Ajay
Re: saveAsTextFile
Hi, Before saving the rdd do a collect to the rdd and print the content of the rdd. Probably its a null value. Thanks. On Sat, Jan 3, 2015 at 5:37 PM, Pankaj Narang [via Apache Spark User List] ml-node+s1001560n20953...@n3.nabble.com wrote: If you can paste the code here I can certainly help. Also confirm the version of spark you are using Regards Pankaj Infoshore Software India -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp20951p20953.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp20951p21160.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Failing jobs runs twice
Found a setting that seems to fix this problem, but it does not seems to be available until Spark 1.3. See https://issues.apache.org/jira/browse/SPARK-2165 However, glad to see a work is being done with the issue. On Tue, Jan 13, 2015 at 8:00 PM, Anders Arpteg arp...@spotify.com wrote: Yes Andrew, I am. Tried setting spark.yarn.applicationMaster.waitTries to 1 (thanks Sean), but with no luck. Any ideas? On Tue, Jan 13, 2015 at 7:58 PM, Andrew Or and...@databricks.com wrote: Hi Anders, are you using YARN by any chance? 2015-01-13 0:32 GMT-08:00 Anders Arpteg arp...@spotify.com: Since starting using Spark 1.2, I've experienced an annoying issue with failing apps that gets executed twice. I'm not talking about tasks inside a job, that should be executed multiple times before failing the whole app. I'm talking about the whole app, that seems to close the previous Spark context, start a new, and rerun the app again. This is annoying since it overwrite the log files as well and it becomes hard to troubleshoot the failing app. Does anyone know how to turn this feature off? Thanks, Anders
Error when running SparkPi on Secure HA Hadoop cluster
Hi, Setup is as follows Hadoop Cluster 2.3.0 (CDH5.0) - Namenode HA - Resource manager HA - Secured with Kerberos Spark 1.2 Run SparkPi as follows - conf/spark-defaults.conf has following entries spark.yarn.queue myqueue spark.yarn.access.namenodes hdfs://namespace (remember this is namenode HA) - Do kinit with some user keytab - submit SparkPi as follows spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue thequeue $MY_SPARK_DIR/lib/spark-examples*.jar 10 Gives following trace (not sure why it shows unknown queue when queue name is specified in the spark-defaults.conf above. 15/01/15 19:18:27 INFO impl.YarnClientImpl: Submitted application application_1415648563285_31469 15/01/15 19:18:28 INFO yarn.Client: Application report for application_1415648563285_31469 (state: FAILED) 15/01/15 19:18:28 INFO yarn.Client: client token: N/A diagnostics: Application application_1415648563285_31469 submitted by user XYZ to unknown queue: thequeue --- WHY UNKNOWN QUEUE ??? ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: thequeue --- WHY UNKNOWN QUEUE ??? start time: 1421349507652 final status: FAILED tracking URL: N/A user: XYZ Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:102) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:58)
Re: Inserting an element in RDD[String]
Sure there is. Create a new RDD just containing the schema line (hint: use sc.parallelize) and then union both the RDDs (the header RDD and data RDD) to get a final desired RDD. On Thu Jan 15 2015 at 19:48:52 Hafiz Mujadid hafizmujadi...@gmail.com wrote: hi experts! I hav an RDD[String] and i want to add schema line at beginning in this rdd. I know RDD is immutable. So is there anyway to have a new rdd with one schema line and contents of previous rdd? Thanks -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Some tasks are taking long time
If you don't want a few slow tasks to slow down the entire job, you can turn on speculation. Here are the speculation settings from Spark Configuration - Spark 1.2.0 Documentation. | | | | | | | | | Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark Properties Dynamically Loading Spark Properties Viewing Spark Properties Available Properties Application Properties Runtime Environment Shuffle Behavior Spark UI | | | | View on spark.apache.org | Preview by Yahoo | | | | | | spark.speculation | false | If set to true, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. | | spark.speculation.interval | 100 | How often Spark will check for tasks to speculate, in milliseconds. | | spark.speculation.quantile | 0.75 | Percentage of tasks which must be complete before speculation is enabled for a particular stage. | | spark.speculation.multiplier | 1.5 | How many times slower a task is than the median to be considered for speculation. | On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID wrote: Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 nid=0x3f85 waiting on condition [0x7fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7fd0aee82e00 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) Any inputs/suggestions to improve job time will be appreciated. Regards,Ajay
Re: Inserting an element in RDD[String]
Hi, You can take the schema line in another rdd and than do a union of the two rdd . ListString schemaList = new ArrayListString; schemaList.add(xyz); // where xyz is your schema line JavaRDD schemaRDDString = sc.parallize(schemaList) ; //where sc is your sparkcontext JavaRDD newRDDString = schemaRDD.union(yourRDD); // where yourRDD is your another rdd starting of which you want to add the schema line. The code is in java, you can change it to scala Thanks. On Thu, Jan 15, 2015 at 7:46 PM, Hafiz Mujadid [via Apache Spark User List] ml-node+s1001560n21161...@n3.nabble.com wrote: hi experts! I hav an RDD[String] and i want to add schema line at beginning in this rdd. I know RDD is immutable. So is there anyway to have a new rdd with one schema line and contents of previous rdd? Thanks -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161p21163.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to force parallel processing of RDD using multiple thread
Check the number of partitions in your input. It may be much less than the available parallelism of your small cluster. For example, input that lives in just 1 partition will spawn just 1 task. Beyond that parallelism just happens. You can see the parallelism of each operation in the Spark UI. On Thu, Jan 15, 2015 at 10:53 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Spark Standalone cluster. My program is running very slow, I suspect it is not doing parallel processing of rdd. How can I force it to run parallel? Is there anyway to check whether it is processed in parallel? Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, January 15, 2015 4:29 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to force parallel processing of RDD using multiple thread What is your cluster manager? For example on YARN you would specify --executor-cores. Read: http://spark.apache.org/docs/latest/running-on-yarn.html On Thu, Jan 15, 2015 at 8:54 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have a standalone spark cluster with only one node with 4 CPU cores. How can I force spark to do parallel processing of my RDD using multiple threads? For example I can do the following Spark-submit --master local[4] However I really want to use the cluster as follow Spark-submit --master spark://10.125.21.15:7070 In that case, how can I make sure the RDD is processed with multiple threads/cores? Thanks Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Anyway to make RDD preserve input directories structures?
Maybe you are saying you already do this, but it's perfectly possible to process as many RDDs as you like in parallel on the driver. That may allow your current approach to eat up as much parallelism as you like. I'm not sure if that's what you are describing with submit multi applications but you do not need separate Spark applications, just something like a local parallel Scala collection invoking the RDD operations. Yes, these operations do not and should not 'append' data to existing files. Of course this has the downside of all that overhead of processing every file individually rather than as one big job. Although you may not be able to design this differently, I suggest that ideally you do not encode this information in directory structure, but in the data itself. I know sometimes this is important for downstream tools though, as it is part of how partitioning is defined for example. On Fri, Jan 16, 2015 at 2:15 AM, 逸君曹 caoyijun2...@gmail.com wrote: say there's some logs: s3://log-collections/sys1/20141212/nginx.gz s3://log-collections/sys1/20141213/nginx-part-1.gz s3://log-collections/sys1/20141213/nginx-part-2.gz I have a function that parse the logs for later analysis. I want to parse all the files. So I do this: logs = sc.textFile('s3://log-collections/sys1/') logs.map(parse).saveAsTextFile('s3://parsed-logs/') BUT, this will destroy the date separate naming shema.resulting: s3://parsed-logs/part- s3://parsed-logs/part-0001 ... And the worse part is that when I got a new day logs. It seems rdd.saveAsTextFile couldn't just append the new day's log. So I create a RDD for every single file.and parse it, save to the name I want.like this: one = sc.textFile(s3://log-collections/sys1/20141213/nginx-part-1.gz) one.map(parse).saveAsTextFile(s3://parsed-logs/20141213/01/) which resulting: s3://parsed-logs/20141212/01/part- s3://parsed-logs/20141213/01/part- s3://parsed-logs/20141213/01/part-0001 s3://parsed-logs/20141213/02/part- s3://parsed-logs/20141213/02/part-0001 s3://parsed-logs/20141213/02/part-0002 And when a new day's log comes. I just process that day's logs and put to the proper directory(or key) THE PROBLEM is this way I have to create a seperated RDD for every single file. which couldn't take advantage of Spark's functionality of automatic parallel processing.[I'm trying to submit multi applications for each batch of files.] Or maybe I'd better use hadoop streaming for this ? Any suggestions? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Issue with Parquet on Spark 1.2 and Amazon EMR
Thanks to Aniket’s work there is two new options to the EMR install script for Spark. See https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/README.md The “-a” option can be used to bump the spark-assembly to the front of the classpath. -Christopher From: Aniket Bhatnagar [mailto:aniket.bhatna...@gmail.com] Sent: Monday, January 12, 2015 3:05 AM To: Kelly, Jonathan; Adam Gilmore; user@spark.apache.org Subject: Re: Issue with Parquet on Spark 1.2 and Amazon EMR Meanwhile, I have submitted a pull request (https://github.com/awslabs/emr-bootstrap-actions/pull/37) that allows users to place their jars ahead of all other jars in spark classpath. This should serve as a temporary workaround for all class conflicts. Thanks, Aniket On Mon Jan 05 2015 at 22:13:47 Kelly, Jonathan jonat...@amazon.commailto:jonat...@amazon.com wrote: I've noticed the same thing recently and will contact the appropriate owner soon. (I work for Amazon, so I'll go through internal channels and report back to this list.) In the meantime, I've found that editing spark-env.sh and putting the Spark assembly first in the classpath fixes the issue. I expect that the version of Parquet that's being included in the EMR libs just needs to be upgraded. ~ Jonathan Kelly From: Aniket Bhatnagar aniket.bhatna...@gmail.commailto:aniket.bhatna...@gmail.com Date: Sunday, January 4, 2015 at 10:51 PM To: Adam Gilmore dragoncu...@gmail.commailto:dragoncu...@gmail.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Issue with Parquet on Spark 1.2 and Amazon EMR Can you confirm your emr version? Could it be because of the classpath entries for emrfs? You might face issues with using S3 without them. Thanks, Aniket On Mon, Jan 5, 2015, 11:16 AM Adam Gilmore dragoncu...@gmail.commailto:dragoncu...@gmail.com wrote: Just an update on this - I found that the script by Amazon was the culprit - not exactly sure why. When I installed Spark manually onto the EMR (and did the manual configuration of all the EMR stuff), it worked fine. On Mon, Dec 22, 2014 at 11:37 AM, Adam Gilmore dragoncu...@gmail.commailto:dragoncu...@gmail.com wrote: Hi all, I've just launched a new Amazon EMR cluster and used the script at: s3://support.elasticmapreduce/spark/install-spark to install Spark (this script was upgraded to support 1.2). I know there are tools to launch a Spark cluster in EC2, but I want to use EMR. Everything installs fine; however, when I go to read from a Parquet file, I end up with (the main exception): Caused by: java.lang.NoSuchMethodError: parquet.hadoop.ParquetInputSplit.init(Lorg/apache/hadoop/fs/Path;JJJ[Ljava/lang/String;[JLjava/lang/String;Ljava/util/Map;)V at parquet.hadoop.TaskSideMetadataSplitStrategy.generateTaskSideMDSplits(ParquetInputFormat.java:578) ... 55 more It seems to me like a version mismatch somewhere. Where is the parquet-hadoop jar coming from? Is it built into a fat jar for Spark? Any help would be appreciated. Note that 1.1.1 worked fine with Parquet files.