Spark on Mesos cause mesos-master OOM
Hi List, We're recently trying to running spark on Mesos, however, we encountered a fatal error that mesos-master process will continuousely consume memory and finally killed by OOM Killer, this situation only happening if has spark job (fine-grained mode) running. We finally root caused the issue and found that is because spark executor attach rdd computed result in TaskStatus, like below: --8---8- spark.git/core/src/main/scala/org/apache/spark/executor/Executor.scala val serializedDirectResult = ser.serialize(directResult) logInfo(Serialized size of result for + taskId + is + serializedDirectResult.limit) val serializedResult = { if (serializedDirectResult.limit = execBackend.akkaFrameSize() - AkkaUtils.reservedSizeBytes) { logInfo(Storing result for + taskId + in local BlockManager) val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) ser.serialize(new IndirectTaskResult[Any](blockId)) } else { logInfo(Sending result for + taskId + directly to driver) serializedDirectResult } } execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) logInfo(Finished task ID + taskId) ---8-8- And the spark executor log says how large the serializedResult is like below: 14/08/22 13:29:18 INFO Executor: Serialized size of result for 248 is 17573033 Since in fine-grained mode, every singe spark stage finished in say 10 seconds and may have tens of tasks, so it's generally fail mesos-master OOM in tens of minutes. I'm not familiar with spark, and I'm wondering if we should not store serializedResult into TaskStatus? -- Thanks, Chengwei signature.asc Description: Digital signature
Re: OOM Java heap space error on saveAsTextFile
What operation are you performing before doing the saveAsTextFile? If you are doing a groupBy/sortBy/mapPartition/reduceByKey operations then you can specify the number of partitions. We were facing these kind of problems and specifying the correct partition solved the issue. Thanks Best Regards On Fri, Aug 22, 2014 at 2:06 AM, Daniil Osipov daniil.osi...@shazam.com wrote: Hello, My job keeps failing on saveAsTextFile stage (frustrating after a 3 hour run) with an OOM exception. The log is below. I'm running the job on an input of ~8Tb gzipped JSON files, executing on 15 m3.xlarge instances. Executor is given 13Gb memory, and I'm setting two custom preferences in the job: spark.akka.frameSize: 50 (otherwise it fails due to exceeding the limit of 10Mb), spark.storage.memoryFraction: 0.2 Any suggestions? 14/08/21 19:29:26 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-99-160-181.ec2.internal :36962 14/08/21 19:29:31 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 17541459 bytes 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-144-221-26.ec2.internal :49973 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-69-31-121.ec2.internal :34569 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-165-70-221.ec2.internal :49193 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-218-181-93.ec2.internal :57648 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-142-187-230.ec2.internal :48115 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-101-178-68.ec2.internal :51931 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-99-165-121.ec2.internal :38153 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-179-187-182.ec2.internal :55645 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-182-231-107.ec2.internal :54088 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-165-79-9.ec2.internal :40112 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-111-169-138.ec2.internal :40394 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-203-161-222.ec2.internal :47447 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-153-141-230.ec2.internal :53906 14/08/21 19:29:32 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-20] shutting down ActorSystem [spark] java.lang.OutOfMemoryError: Java heap space at com.google.protobuf_spark.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62) at akka.remote.transport.AkkaPduProtobufCodec$.constructPayload(AkkaPduCodec.scala:145) at akka.remote.transport.AkkaProtocolHandle.write(AkkaProtocolTransport.scala:156) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:569) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:544) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at akka.actor.FSM$class.processEvent(FSM.scala:595) at akka.remote.EndpointWriter.processEvent(Endpoint.scala:443) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/08/21 19:29:32 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at RecRateApp.scala:88 Exception in
Re: Finding previous and next element in a sorted RDD
There's no way to avoid a shuffle due to the first and last elements of each partition needing to be computed with the others, but I wonder if there is a way to do a minimal shuffle. On Thu, Aug 21, 2014 at 6:13 PM, cjwang c...@cjwang.us wrote: One way is to do zipWithIndex on the RDD. Then use the index as a key. Add or subtract 1 for previous or next element. Then use cogroup or join to bind them together. val idx = input.zipWithIndex val previous = idx.map(x = (x._2+1, x._1)) val current = idx.map(x = (x._2, x._1)) val next = idx.map(x = (x._2-1, x._1)) val joined = current leftOuterJoin previous leftOuterJoin next Code looks clean to me, but I feel uneasy about the performance of join. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-tp12621p12623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Advantage of using cache()
Because map-reduce tasks like join will save shuffle data to disk . So the only diffrence with caching or no-caching version is : .map { case (x, (n, i)) = (x, n)} - Thanks, Nieyuan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Advantage-of-using-cache-tp12480p12634.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: LDA example?
You can check out this pull request: https://github.com/apache/spark/pull/476 LDA is on the roadmap for the 1.2 release, hopefully we will officially support it then! Best, Burak - Original Message - From: Denny Lee denny.g@gmail.com To: user@spark.apache.org Sent: Thursday, August 21, 2014 10:10:35 PM Subject: LDA example? Quick question - is there a handy sample / example of how to use the LDA algorithm within Spark MLLib? Thanks! Denny - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DStream start a separate DStream
Why dont you directly use DStream created as output of windowing process? Any reason Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Aug 21, 2014 at 8:38 PM, Josh J joshjd...@gmail.com wrote: Hi, I would like to have a sliding window dstream perform a streaming computation and store these results. Once these results are stored, I then would like to process the results. Though I must wait until the final computation done for all tuples in the sliding window, before I begin the new DStream. How can I accomplish this with spark? Sincerely, Josh
iterator cause NotSerializableException
Hi The following code gives me 'Task not serializable: java.io.NotSerializableException: scala.collection.mutable.ArrayOps$ofInt' var x = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3) var iter = Array(5).toIterator var value = 5 var value2 = iter.next x.map( q = q*value).collect //Line 1, it works. x.map( q= q*value2).collect //Line 2, error 'value' and 'value2' look like exactly same, but why does this happen? The iterator from RDD.toLocalIterator cause this too. I tested it in spark-shell on Spark 1.0.2. Thanks Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iterator-cause-NotSerializableException-tp12638.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
countByWindow save the count ?
Hi, Hopefully a simple question. Though is there an example of where to save the output of countByWindow ? I would like to save the results to external storage (kafka or redis). The examples show only stream.print() Thanks, Josh
Installation On Windows machine
Hello Team, I was just trying to install spark on my windows server 2012 machine and use it in my project; but unfortunately I do not find any documentation for the same. Please let me know if we have drafted anything for spark users on Windows. I am really in need of it as we are using Windows machine for Hadoop and other tools and so cannot move back to Linux OS or anything. We run Hadoop on hortonworks HDP2.0 platform and also recently I came across Spark and so wanted use this even in my project for my Analytics work. Please suggest me links or documents where I can move ahead with my installation and usage. I want to run it on Java. Looking forward for a reply, Thanking you in Advance, Sincerely, Abhishek Thanks, Abhishek Mishra Software Engineer Innovation Delivery CoE (IDC) Xerox Services India 4th Floor Tapasya, Infopark, Kochi, Kerala, India 682030 m +91-989-516-8770 www.xerox.com/businessserviceshttp://cts.vresp.com/c/?Corporate/b657ed2e2b/abc2164da2/fcb77b0c6c
On Spark Standalone mode, Where the driver program will run?
Hi all, 1. On Spark Standalone mode, client sumbit application. Where the driver program will run? client or master? 2. Standalone is reliable? can use in production mode ? taoist...@gmail.com
[PySpark][Python 2.7.8][Spark 1.0.2] count() with TypeError: an integer is required
I am using PySpark with IPython notebook. pre data = sc.parallelize(range(1000), 10) #successful data.map(lambda x: x+1).collect() #Error data.count() /pre Something similar:http://apache-spark-user-list.1001560.n3.nabble.com/Exception-on-simple-pyspark-script-td3415.html But it does not figure out how to solve it. Any one help? pre --- Py4JJavaError Traceback (most recent call last) ipython-input-10-0106b6ff8a89 in module() 1 data.count() /home/workspace/spark-1.0.2-bin-hadoop2/python/pyspark/rdd.pyc in count(self) 735 3 736 -- 737 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 738 739 def stats(self): /home/workspace/spark-1.0.2-bin-hadoop2/python/pyspark/rdd.pyc in sum(self) 726 6.0 727 -- 728 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) 729 730 def count(self): /home/workspace/spark-1.0.2-bin-hadoop2/python/pyspark/rdd.pyc in reduce(self, f) 646 if acc is not None: 647 yield acc -- 648 vals = self.mapPartitions(func).collect() 649 return reduce(f, vals) 650 /home/workspace/spark-1.0.2-bin-hadoop2/python/pyspark/rdd.pyc in collect(self) 610 611 with _JavaStackTrace(self.context) as st: -- 612 bytesInJava = self._jrdd.collect().iterator() 613 return list(self._collect_iterator_through_file(bytesInJava)) 614 /home/workspace/spark-1.0.2-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /home/workspace/spark-1.0.2-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /home/workspace/spark-0.9.1-bin-hadoop2/python/pyspark/worker.py, line 77, in main serializer.dump_stream(func(split_index, iterator), outfile) File /home/workspace/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py, line 182, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File /home/workspace/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py, line 117, in dump_stream for obj in iterator: File /home/workspace/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py, line 171, in _batched for item in iterator: File /home/workspace/spark-1.0.2-bin-hadoop2/python/pyspark/rdd.py, line 642, in func TypeError: an integer is required org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) at
Re: [PySpark][Python 2.7.8][Spark 1.0.2] count() with TypeError: an integer is required
I'm running pyspark with Python 2.7.8 under Virtualenv System Python Version: Python 2.6.x -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Python-2-7-8-Spark-1-0-2-count-with-TypeError-an-integer-is-required-tp12643p12645.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Extracting an element from the feature vector in LabeledPoint
Hi all, Somehow related to this question and this data structure, what is the best way to extract features using names instead of positions? Of course, it is previously necessary to store the names in some way... Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extracting-an-element-from-the-feature-vector-in-LabeledPoint-tp0p12644.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Block input-* already exists on this machine; not re-adding it warnings
Hi everyone I back ported kinesis-asl to spark 1.0.2 and ran a quick test on my local machine. It seems to be working fine but I keep getting the following warnings. I am not sure what it means and weather it is something to worry about or not. 2014-08-22 15:53:43,803 [pool-1-thread-7] WARN o.apache.spark.storage.BlockManager - Block input-0-1408703023600 already exists on this machine; not re-adding it Thoughts? Thanks, Aniket
Re: Finding Rank in Spark
Does anyone knw a way to do this? I tried it by sorting it and writing an auto increment function. But since its parallel computing the result is wrong. Is there anyway? please reply -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-Rank-in-Spark-tp12028p12647.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Understanding how to create custom DStreams in Spark streaming
Hi everyone Sorry about the noob question, but I am struggling to understand ways to create DStreams in Spark. Here is my understanding based on what I could gather from documentation and studying Spark code (as well as some hunch). Please correct me if I am wrong. 1. In most cases, one would either extend ReceiverInputDStream or InputDStream to create a custom DStream that pulls data from an external system. - ReceiverInputDStream is used to distributed data receiving code (i.e. Receiver) to workers. N instances of ReceiverInputDStream results in distributing to N workers. No control on which worker nodes executes which instance of receiving code. - InputDStream is used to run receiving code in driver. The driver creates RDDs which are distributed to workers nodes which run processing logic. No way to control on how RDD gets distributed to workers unless one does repartitioning of generated RDDs. 2. DStreams or RDDs get no feedback on whether the processing was successful or not. This means, one can't implement re-pull in case of failures. The above makes me realize that it is not trivial to implement a streaming use case with atleast once processing guarantee. Any thoughts on building reliable real time processing system using Spark will be appreciated.
Losing Executors on cluster with RDDs of 100GB
Hi all, I have a spark cluster of 30 machines, 16GB / 8 cores on each running in standalone mode. Previously my application was working well ( several RDDs the largest being around 50G). When I started processing larger amounts of data (RDDs of 100G) my app is losing executors. Im currently just loading them from a database, rePartitioning and persisting to disk (with replication x2) I have spark.executor.memory= 9G, memoryFraction = 0.5, spark.worker.timeout =120, spark.akka.askTimeout=30, spark.storage.blockManagerHeartBeatMs=3. I haven't change the default of my worker memory so its at 512m (should this be larger) ? I've been getting the following messages from my app: [error] o.a.s.s.TaskSchedulerImpl - Lost executor 3 on myserver1: worker lost [error] o.a.s.s.TaskSchedulerImpl - Lost executor 13 on myserver2: Unknown executor exit code (137) (died from signal 9?) [error] a.r.EndpointWriter - AssociationError [akka.tcp://spark@master:59406] - [akka.tcp://sparkExecutor@myserver2:32955]: Error [Association failed with [akka.tcp://sparkExecutor@myserver2:32955]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@myserver2.com:32955] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: myserver2/198.18.102.160:32955 ] [error] a.r.EndpointWriter - AssociationError [akka.tcp://spark@master:59406] - [akka.tcp://spark@myserver1:53855]: Error [Association failed with [akka.tcp://spark@myserver1:53855]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@myserver1:53855] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: myserver1/198.18.102.160:53855 ] The worker logs and executor logs do not contain errors. Any ideas what the problem is ? Yadid - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [PySpark][Python 2.7.8][Spark 1.0.2] count() with TypeError: an integer is required
Do I have to deploy Python to every machine to make $PYSPARK_PYTHON work correctly? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Python-2-7-8-Spark-1-0-2-count-with-TypeError-an-integer-is-required-tp12643p12651.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Do we have to install the snappy when running the shuffle jobs
Hi everyone! Nowadays Spark has set the Snappy as the default compression codec in spark-1.1.0-Snapshot. So if I want run a shuffle job, do I have to install snappy in linux?
Manipulating/Analyzing CSV files in Spark on local machine
Hello all, I am new to Spark and I want to analyze csv file using Spark on my local machine. The csv files contains airline database and I want to get a few descriptive statistics (e.g. maximum of one column, mean, standard deviation in a column, etc.) for my file. I am reading the file using simple sc.textFile(file.csv). The queries are: 1. Is there any optimal way of reading the file so that loading takes less amount of time in Spark. The file can be of 3GB. 2. How to handle column manipulations according to the type of queries given above. Thank you Regards, Vineet Hingorani
why classTag not typeTag?
Folks, I am wondering why Spark uses ClassTag in RDD[T: ClassTag] instead of the more functional TypeTag option. I have some code that needs TypeTag functionality and I don't know if a typeTag can be converted to a classTag. Mohit.
Re: pyspark/yarn and inconsistent number of executors
Hi Calvin, When you say until all the memory in the cluster is allocated and the job gets killed, do you know what's going on? Spark apps should never be killed for requesting / using too many resources? Any associated error message? Unfortunately there are no tools currently for tweaking the number of executors in an automated manner. An option to use the entire YARN cluster seems useful. I just filed a JIRA for it - https://issues.apache.org/jira/browse/SPARK-3183. -Sandy On Tue, Aug 19, 2014 at 12:51 PM, Calvin iphcal...@gmail.com wrote: I've set up a YARN (Hadoop 2.4.1) cluster with Spark 1.0.1 and I've been seeing some inconsistencies with out of memory errors (java.lang.OutOfMemoryError: unable to create new native thread) when increasing the number of executors for a simple job (wordcount). The general format of my submission is: spark-submit \ --master yarn-client \ --num-executors=$EXECUTORS \ --executor-cores 1 \ --executor-memory 2G \ --driver-memory 3G \ count.py intput output If I run without specifying the number of executors, it defaults to two (3 containers: 2 executors, 1 driver). Is there any mechanism to let a spark application scale to the capacity of the YARN cluster automatically? Similarly, for low numbers of executors I get what I asked for (e.g., 10 executors results in 11 containers running, 20 executors results in 21 containers, etc) until a particular threshold... when I specify 50 containers, Spark seems to start asking for more and more containers until all the memory in the cluster is allocated and the job gets killed. I don't understand that particular behavior—if anyone has any thoughts, that would be great if you could share your experiences. Wouldn't it be preferable to have Spark stop requesting containers if the cluster is at capacity rather than kill the job or error out? Does anyone have any recommendations on how to tweak the number of executors in an automated manner? Thanks, Calvin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Parser error
Hi Sankar, You need to create an external table in order to specify the location of data (i.e. using CREATE EXTERNAL TABLE user1 LOCATION). You can take a look at this page https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/TruncateTable for reference. Thanks, Yin On Thu, Aug 21, 2014 at 11:12 PM, S Malligarjunan smalligarju...@yahoo.com.invalid wrote: Hello All, When i execute the following query val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) CREATE TABLE user1 (time string, id string, u_id string, c_ip string, user_agent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION 's3n:// hadoop.anonymous.com/output/qa/cnv_px_ip_gnc/ds=2014-06-14/') I am getting the following error org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: CREATE TABLE user1 (time string, id string, u_id string, c_ip string, user_agent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY ' ' STORED AS TEXTFILE LOCATION 's3n:// hadoop.anonymous.com/output/qa/cnv_px_ip_gnc/ds=2014-06-14/') at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:215) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:98) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:102) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC.init(console:31) at $iwC.init(console:33) at init(console:35) Kindly let me know what could be the issue here. I have cloned spark from github. Using Hadoop 1.0.3 Thanks and Regards, Sankar S.
Re: Spark SQL Parser error
Hello Yin, I have tried the create external table command as well. I get the same error. Please help me to find the root cause. Thanks and Regards, Sankar S. On Friday, 22 August 2014, 22:43, Yin Huai huaiyin@gmail.com wrote: Hi Sankar, You need to create an external table in order to specify the location of data (i.e. using CREATE EXTERNAL TABLE user1 LOCATION). You can take a look at this page for reference. Thanks, Yin On Thu, Aug 21, 2014 at 11:12 PM, S Malligarjunan smalligarju...@yahoo.com.invalid wrote: Hello All, When i execute the following query val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) CREATE TABLE user1 (time string, id string, u_id string, c_ip string, user_agent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION 's3n://hadoop.anonymous.com/output/qa/cnv_px_ip_gnc/ds=2014-06-14/') I am getting the following error org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: CREATE TABLE user1 (time string, id string, u_id string, c_ip string, user_agent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '' LINES TERMINATED BY ' ' STORED AS TEXTFILE LOCATION 's3n://hadoop.anonymous.com/output/qa/cnv_px_ip_gnc/ds=2014-06-14/') at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:215) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:98) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:102) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC.init(console:31) at $iwC.init(console:33) at init(console:35) Kindly let me know what could be the issue here. I have cloned spark from github. Using Hadoop 1.0.3 Thanks and Regards, Sankar S.
Re: Spark SQL Parser error
Hello Yin, Forgot to mention one thing, the same query works fine in Hive and Shark.. Thanks and Regards, Sankar S. On , S Malligarjunan smalligarju...@yahoo.com wrote: Hello Yin, I have tried the create external table command as well. I get the same error. Please help me to find the root cause. Thanks and Regards, Sankar S. On Friday, 22 August 2014, 22:43, Yin Huai huaiyin@gmail.com wrote: Hi Sankar, You need to create an external table in order to specify the location of data (i.e. using CREATE EXTERNAL TABLE user1 LOCATION). You can take a look at this page for reference. Thanks, Yin On Thu, Aug 21, 2014 at 11:12 PM, S Malligarjunan smalligarju...@yahoo.com.invalid wrote: Hello All, When i execute the following query val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) CREATE TABLE user1 (time string, id string, u_id string, c_ip string, user_agent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION 's3n://hadoop.anonymous.com/output/qa/cnv_px_ip_gnc/ds=2014-06-14/') I am getting the following error org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: CREATE TABLE user1 (time string, id string, u_id string, c_ip string, user_agent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '' LINES TERMINATED BY ' ' STORED AS TEXTFILE LOCATION 's3n://hadoop.anonymous.com/output/qa/cnv_px_ip_gnc/ds=2014-06-14/') at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:215) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:98) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:102) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC.init(console:31) at $iwC.init(console:33) at init(console:35) Kindly let me know what could be the issue here. I have cloned spark from github. Using Hadoop 1.0.3 Thanks and Regards, Sankar S.
Re: Finding previous and next element in a sorted RDD
It would be nice if an RDD that was massaged by OrderedRDDFunction could know its neighbors. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-tp12621p12664.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
importing scala libraries from python?
This is probably a bit ridiculous, but I'm wondering if it's possible to use scala libraries in a python module? The Cassandra connector here https://github.com/datastax/spark-cassandra-connector is in Scala, would I need a Python version of that library to use Python Spark? Personally I have no issue with using Scala, but I'm exploring if it'll be possible to integrate spark into my Python Cassandra object mapper, cqlengine. -- Jon Haddad http://www.rustyrazorblade.com twitter: rustyrazorblade - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
FetchFailed when collect at YARN cluster
Hi, I am having this FetchFailed issue when the driver is about to collect about 2.5M lines of short strings (about 10 characters each line) from a YARN cluster with 400 nodes: *14/08/22 11:43:27 WARN scheduler.TaskSetManager: Lost task 205.0 in stage 0.0 (TID 1228, aaa.xxx.com): FetchFailed(BlockManagerId(220, aaa.xxx.com, 37899, 0), shuffleId=0, mapId=420, reduceId=205) 14/08/22 11:43:27 WARN scheduler.TaskSetManager: Lost task 603.0 in stage 0.0 (TID 1626, aaa.xxx.com): FetchFailed(BlockManagerId(220, aaa.xxx.com, 37899, 0), shuffleId=0, mapId=420, reduceId=603)* And other than this FetchFailed, I am not able to see anything else from the log file (no OOM errors shown). This does not happen when there is only 2M lines. I guess it might because of the akka message size, and then I used the following spark.akka.frameSize 100 spark.akka.timeout 200 And that does not help as well. Has anyone experienced similar problems? Thanks, Jiayu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailed-when-collect-at-YARN-cluster-tp12670.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Parser error
Hello Sankar, Add JAR in SQL is not supported at the moment. We are working on it ( https://issues.apache.org/jira/browse/SPARK-2219). For now, can you try SparkContext.addJar or using --jars your-jar to launch spark shell? Thanks, Yin On Fri, Aug 22, 2014 at 2:01 PM, S Malligarjunan smalligarju...@yahoo.com wrote: Hello Yin/All. @Yin - Thanks for helping. I solved the sql parser error. I am getting the following exception now scala hiveContext.hql(ADD JAR s3n://hadoop.anonymous.com/lib/myudf.jar ); warning: there were 1 deprecation warning(s); re-run with -deprecation for details 14/08/22 17:58:55 INFO SessionState: converting to local s3n:// hadoop.anonymous.com/lib/myudf.jar 14/08/22 17:58:56 ERROR SessionState: Unable to register /tmp/3d273a4c-0494-4bec-80fe-86aa56f11684_resources/myudf.jar Exception: org.apache.spark.repl.SparkIMain$TranslatingClassLoader cannot be cast to java.net.URLClassLoader java.lang.ClassCastException: org.apache.spark.repl.SparkIMain$TranslatingClassLoader cannot be cast to java.net.URLClassLoader at org.apache.hadoop.hive.ql.exec.Utilities.addToClassPath(Utilities.java:1680) Thanks and Regards, Sankar S. On Friday, 22 August 2014, 22:53, S Malligarjunan smalligarju...@yahoo.com.INVALID wrote: Hello Yin, Forgot to mention one thing, the same query works fine in Hive and Shark.. Thanks and Regards, Sankar S. On , S Malligarjunan smalligarju...@yahoo.com wrote: Hello Yin, I have tried the create external table command as well. I get the same error. Please help me to find the root cause. Thanks and Regards, Sankar S. On Friday, 22 August 2014, 22:43, Yin Huai huaiyin@gmail.com wrote: Hi Sankar, You need to create an external table in order to specify the location of data (i.e. using CREATE EXTERNAL TABLE user1 LOCATION). You can take a look at this page https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/TruncateTable for reference. Thanks, Yin On Thu, Aug 21, 2014 at 11:12 PM, S Malligarjunan smalligarju...@yahoo.com.invalid wrote: Hello All, When i execute the following query val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) CREATE TABLE user1 (time string, id string, u_id string, c_ip string, user_agent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION 's3n:// hadoop.anonymous.com/output/qa/cnv_px_ip_gnc/ds=2014-06-14/') I am getting the following error org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: CREATE TABLE user1 (time string, id string, u_id string, c_ip string, user_agent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY ' ' STORED AS TEXTFILE LOCATION 's3n:// hadoop.anonymous.com/output/qa/cnv_px_ip_gnc/ds=2014-06-14/') at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:215) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:98) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:102) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC.init(console:31) at $iwC.init(console:33) at init(console:35) Kindly let me know what could be the issue here. I have cloned spark from github. Using Hadoop 1.0.3 Thanks and Regards, Sankar S.
[PySpark] order of values in GroupByKey()
Is there any way to control the ordering of values for each key during a groupByKey() operation? Is there some sort of implicit ordering in place already? Thanks Arpan
spark streaming - realtime reports - storing current state of resources
Hi All, I have set of 1000k Workers of a company with different attribute associated with them. I like at anytime to be able to report on their current state and update the reports every 5 second. Spark Streaming allows you to receive events about the Workers state changes and process them. Where would I store of the state of the 1000k workers so I can change the state of the workers in realtime and query them in real time? with Spark Streaming? thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-realtime-reports-storing-current-state-of-resources-tp12674.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
cache table with JDBC
I am using Spark's Thrift server to connect to Hive and use JDBC to issue queries. Is there a way to cache table in Sparck by using JDBC call? Thanks, Ken -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cache-table-with-JDBC-tp12675.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Hive From Spark
I thought the fix had been pushed to the apache master ref. commit [SPARK-2848] Shade Guava in uber-jars By Marcelo Vanzin on 8/20. So my previous email was based on own build of the apache master, which turned out not working yet. Marcelo: Please correct me if I got that commit wrong. Thanks, Du On 8/22/14, 11:41 AM, Marcelo Vanzin van...@cloudera.com wrote: SPARK-2420 is fixed. I don't think it will be in 1.1, though - might be too risky at this point. I'm not familiar with spark-sql. On Fri, Aug 22, 2014 at 11:25 AM, Andrew Lee alee...@hotmail.com wrote: Hopefully there could be some progress on SPARK-2420. It looks like shading may be the voted solution among downgrading. Any idea when this will happen? Could it happen in Spark 1.1.1 or Spark 1.1.2? By the way, regarding bin/spark-sql? Is this more of a debugging tool for Spark job integrating with Hive? How does people use spark-sql? I'm trying to understand the rationale and motivation behind this script, any idea? Date: Thu, 21 Aug 2014 16:31:08 -0700 Subject: Re: Hive From Spark From: van...@cloudera.com To: l...@yahoo-inc.com.invalid CC: user@spark.apache.org; u...@spark.incubator.apache.org; pwend...@gmail.com Hi Du, I don't believe the Guava change has made it to the 1.1 branch. The Guava doc says hashInt was added in 12.0, so what's probably happening is that you have and old version of Guava in your classpath before the Spark jars. (Hadoop ships with Guava 11, so that may be the source of your problem.) On Thu, Aug 21, 2014 at 4:23 PM, Du Li l...@yahoo-inc.com.invalid wrote: Hi, This guava dependency conflict problem should have been fixed as of yesterday according to https://issues.apache.org/jira/browse/SPARK-2420 However, I just got java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/Ha shCode; by the following code snippet and ³mvn3 test² on Mac. I built the latest version of spark (1.1.0-SNAPSHOT) and installed the jar files to the local maven repo. From my pom file I explicitly excluded guava from almost all possible dependencies, such as spark-hive_2.10-1.1.0.SNAPSHOT, and hadoop-client. This snippet is abstracted from a larger project. So the pom.xml includes many dependencies although not all are required by this snippet. The pom.xml is attached. Anybody knows what to fix it? Thanks, Du --- package com.myself.test import org.scalatest._ import org.apache.hadoop.io.{NullWritable, BytesWritable} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ class MyRecord(name: String) extends Serializable { def getWritable(): BytesWritable = { new BytesWritable(Option(name).getOrElse(\\N).toString.getBytes(UTF-8)) } final override def equals(that: Any): Boolean = { if( !that.isInstanceOf[MyRecord] ) false else { val other = that.asInstanceOf[MyRecord] this.getWritable == other.getWritable } } } class MyRecordTestSuite extends FunSuite { // construct an MyRecord by Consumer.schema val rec: MyRecord = new MyRecord(James Bond) test(generated SequenceFile should be readable from spark) { val path = ./testdata/ val conf = new SparkConf(false).setMaster(local).setAppName(test data exchange with Hive) conf.set(spark.driver.host, localhost) val sc = new SparkContext(conf) val rdd = sc.makeRDD(Seq(rec)) rdd.map((x: MyRecord) = (NullWritable.get(), x.getWritable())) .saveAsSequenceFile(path) val bytes = sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable]).first._2 assert(rec.getWritable() == bytes) sc.stop() System.clearProperty(spark.driver.port) } } From: Andrew Lee alee...@hotmail.com Reply-To: user@spark.apache.org user@spark.apache.org Date: Monday, July 21, 2014 at 10:27 AM To: user@spark.apache.org user@spark.apache.org, u...@spark.incubator.apache.org u...@spark.incubator.apache.org Subject: RE: Hive From Spark Hi All, Currently, if you are running Spark HiveContext API with Hive 0.12, it won't work due to the following 2 libraries which are not consistent with Hive 0.12 and Hadoop as well. (Hive libs aligns with Hadoop libs, and as a common practice, they should be consistent to work inter-operable). These are under discussion in the 2 JIRA tickets: https://issues.apache.org/jira/browse/HIVE-7387 https://issues.apache.org/jira/browse/SPARK-2420 When I ran the command by tweaking the classpath and build for Spark 1.0.1-rc3, I was able to create table through HiveContext, however, when I fetch the data, due to incompatible API calls in Guava, it breaks. This is critical since it needs to map the cllumns to the RDD schema. Hive and Hadoop are using an older version of guava libraries (11.0.1) where Spark Hive is using guava 14.0.1+. The community isn't willing to downgrade to 11.0.1 which is the
Re: wholeTextFiles not working with HDFS
I had the same issue with spark-1.0.2-bin-hadoop*1*, and indeed the issue seems related to Hadoop1. When switching to using spark-1.0.2-bin-hadoop*2*, the issue disappears. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p12677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: AppMaster OOME on YARN
This is all that I see related to spark.MapOutputTrackerMaster in the master logs after OOME 14/08/21 13:24:45 ERROR ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-27] shutting down ActorSystem [spark] java.lang.OutOfMemoryError: Java heap space Exception in thread Thread-59 org.apache.spark.SparkException: Error communicating with MapOutputTracker at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:108) at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:114) at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:319) at org.apache.spark.SparkEnv.stop(SparkEnv.scala:82) at org.apache.spark.SparkContext.stop(SparkContext.scala:984) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:449) Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://spark/user/MapOutputTracker#112553370]] had already been terminated. at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:104) 2.Erery excutor will processing 10+TB/2000 = 5G data. ReduceByKey will create a hashtable of unique lines form this 5G data and keep it in memory. it is maybe exceeed 16G . So you mean the master gets that information from individual nodes and keeps it in memory? On Aug 21, 2014, at 8:18 PM, Nieyuan qiushuiwuh...@gmail.com wrote: 1.At begining of reduce task , mask will deliver map output info to every excutor. You can check stderr to find size of map output info . It should be : spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is xxx bytes 2.Erery excutor will processing 10+TB/2000 = 5G data. ReduceByKey will create a hashtable of unique lines form this 5G data and keep it in memory. it is maybe exceeed 16G . -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/AppMaster-OOME-on-YARN-tp12612p12627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ODBC and HiveThriftServer2
Is it possible to connect to the thrift server using an ODBC client (ODBC-JDBC)? My thrift server is built from branch-1.0-jdbc using Hive 0.13.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ODBC-and-HiveThriftServer2-tp12680.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [PySpark] order of values in GroupByKey()
On 08/22/2014 04:32 PM, Arpan Ghosh wrote: Is there any way to control the ordering of values for each key during a groupByKey() operation? Is there some sort of implicit ordering in place already? Thanks Arpan there's no implicit ordering in place. the same holds for the order of keys, unless you use sortByKey. what are you trying to achieve? best, matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark: Why Standalone mode can not set Executor Number.
As far as I know, only yarn mode can set --num-executors, someone proved to set more number-execuotrs for will perform better than set only 1 or 2 executor with large mem and core. sett http://apache-spark-user-list.1001560.n3.nabble.com/executor-cores-vs-num-executors-td9878.html Why Standalone mode not provide number-execuotrs parameters instead of using spreadout strategy by default to generate executor? Can anyone explain this in detail ? Thanks : ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Why-Standalone-mode-can-not-set-Executor-Number-tp12684.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What about implementing various hypothesis test for LogisticRegression in MLlib
Hi Xiangrui, You can refer to An Introduction to Statistical Learning with Applications in R, there are many stander hypothesis test to do regarding to linear regression and logistic regression, they should be implement in the fist order, then we will list some other testes, which are also important when using logistic regression to build score cards. Xiaobo Gu -- Original -- From: Xiangrui Meng;men...@gmail.com; Send time: Wednesday, Aug 20, 2014 2:18 PM To: guxiaobo1...@qq.com; Cc: user@spark.apache.orguser@spark.apache.org; Subject: Re: What about implementing various hypothesis test for LogisticRegression in MLlib We implemented chi-squared tests in v1.1: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala#L166 and we will add more after v1.1. Feedback on which tests should come first would be greatly appreciated. -Xiangrui On Tue, Aug 19, 2014 at 9:50 PM, guxiaobo1982 guxiaobo1...@qq.com wrote: Hi, From the documentation I think only the model fitting part is implement, what about the various hypothesis test and performance indexes used to evaluate the model fit? Regards, Xiaobo Gu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [PySpark] order of values in GroupByKey()
you can kv.mapValues(sorted), but that's definitely less efficient than sorting during the groupBy you could try using combineByKey directly w/ heapq... from heapq import heapify, heappush, merge def createCombiner(x): return [x] def mergeValues(xs, x): heappush(xs, x) return xs def mergeCombiners(a, b): return merge(a, b) rdd.combineByKey(createCombiner, mergeValues, mergeCombiners) best, matt On 08/22/2014 10:41 PM, Arpan Ghosh wrote: I was grouping time series data by a key. I want the values to be sorted by timestamp after the grouping. On Fri, Aug 22, 2014 at 7:26 PM, Matthew Farrellee m...@redhat.com mailto:m...@redhat.com wrote: On 08/22/2014 04:32 PM, Arpan Ghosh wrote: Is there any way to control the ordering of values for each key during a groupByKey() operation? Is there some sort of implicit ordering in place already? Thanks Arpan there's no implicit ordering in place. the same holds for the order of keys, unless you use sortByKey. what are you trying to achieve? best, matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why classTag not typeTag?
TypeTags are unfortunately not thread-safe in Scala 2.10. They were still somewhat experimental at the time so we decided not to use them. If you want though, you can probably design other APIs that pass a TypeTag around (e.g. make a method that takes an RDD[T] but also requires an implicit TypeTag[T]). Matei On Aug 22, 2014, at 9:15 AM, Mohit Jaggi mohitja...@gmail.com wrote: Folks, I am wondering why Spark uses ClassTag in RDD[T: ClassTag] instead of the more functional TypeTag option. I have some code that needs TypeTag functionality and I don't know if a typeTag can be converted to a classTag. Mohit. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Installation On Windows machine
You should be able to just download / unzip a Spark release and run it on a Windows machine with the provided .cmd scripts, such as bin\spark-shell.cmd. The scripts to launch a standalone cluster (e.g. start-all.sh) won't work on Windows, but you can launch a standalone cluster manually using bin\spark-class org.apache.spark.deploy.master.Master and bin\spark-class org.apache.spark.deploy.worker.Worker spark://master:port For submitting jobs to YARN instead of the standalone cluster, spark-submit.cmd *may* work but I don't think we've tested it heavily. If you find issues with that, please let us know. But overall the instructions should be the same as on Linux, except you use the .cmd scripts instead of the .sh ones. Matei On Aug 22, 2014, at 3:01 AM, Mishra, Abhishek abhishek.mis...@xerox.com wrote: Hello Team, I was just trying to install spark on my windows server 2012 machine and use it in my project; but unfortunately I do not find any documentation for the same. Please let me know if we have drafted anything for spark users on Windows. I am really in need of it as we are using Windows machine for Hadoop and other tools and so cannot move back to Linux OS or anything. We run Hadoop on hortonworks HDP2.0 platform and also recently I came across Spark and so wanted use this even in my project for my Analytics work. Please suggest me links or documents where I can move ahead with my installation and usage. I want to run it on Java. Looking forward for a reply, Thanking you in Advance, Sincerely, Abhishek Thanks, Abhishek Mishra Software Engineer Innovation Delivery CoE (IDC) Xerox Services India 4th Floor Tapasya, Infopark, Kochi, Kerala, India 682030 m +91-989-516-8770 www.xerox.com/businessservices - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
FetchFailedException from Block Manager
Anyone know why I would see this in a bunch of executor logs? Is it just classical overloading of the cluster network, OOM, or something else? If anyone's seen this before, what do I need to tune to make some headway here? Thanks, Victor Caused by: org.apache.spark.FetchFailedException: Fetch failed: BlockManagerId(116, xxx, 54761, 0) 110 32 38 at org.apache.spark.BlockStoreShuffleFetcher.org $apache$spark$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:77) at org.apache.spark.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:107) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:106) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
Re: Configuration for big worker nodes
I think it depends on your job. My personal experiences when I run TB data. spark got loss connection failure if I use big JVM with large memory, but with more executors with small memory, it can run very smoothly. I was running spark on yarn. Thanks. Zhan Zhang On Aug 21, 2014, at 3:42 PM, soroka21 sorok...@gmail.com wrote: Hi, I have relatively big worker nodes. What would be the best worker configuration for them? Should I use all memory for JVM and utilize all cores when running my jobs? Each node has 2x10 cores CPU and 160GB of RAM. Cluster has 4 nodes connected with 10G network. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Configuration-for-big-worker-nodes-tp12614.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org