Re: amp lab spark streaming twitter example
I think your *sparkUrl *points to an invalid cluster url. Just make sure you are giving the correct url (the one you see on top left in the master:8080 webUI). Thanks Best Regards On Tue, Aug 26, 2014 at 11:07 AM, Forest D dev24a...@gmail.com wrote: Hi Jonathan, Thanks for the reply. I ran other exercises (movie recommendation and GraphX) on the same cluster and did not see these errors. So I think this might not be related to the memory setting.. Thanks, Forest On Aug 24, 2014, at 10:27 AM, Jonathan Haddad j...@jonhaddad.com wrote: Could you be hitting this? https://issues.apache.org/jira/browse/SPARK-3178 On Sun, Aug 24, 2014 at 10:21 AM, Forest D dev24a...@gmail.com wrote: Hi folks, I have been trying to run the AMPLab’s twitter streaming example ( http://ampcamp.berkeley.edu/big-data-mini-course/realtime-processing-with-spark-streaming.html ) in the last 2 days.I have encountered the same error messages as shown below: 14/08/24 17:14:22 ERROR client.AppClient$ClientActor: All masters are unresponsive! Giving up. 14/08/24 17:14:22 ERROR cluster.SparkDeploySchedulerBackend: Spark cluster looks dead, giving up. [error] (Thread-39) org.apache.spark.SparkException: Job aborted: Spark cluster looks down org.apache.spark.SparkException: Job aborted: Spark cluster looks down at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104) [trace] Stack trace suppressed: run last compile:run for the full output. --- Time: 1408900463000 ms --- 14/08/24 17:14:23 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory --- Time: 1408900464000 ms --- --- Time: 1408900465000 ms --- --- Time: 1408900466000 ms --- --- Time: 1408900467000 ms --- --- Time: 1408900468000 ms --- --- Time: 1408900469000 ms --- --- Time: 140890047 ms --- --- Time: 1408900471000 ms --- --- Time: 1408900472000 ms --- --- Time: 1408900473000 ms --- --- Time: 1408900474000 ms --- --- Time: 1408900475000 ms --- --- Time: 1408900476000 ms
Re: Only master is really busy at KMeans training
With a lower number of partitions, I keep losing executors during collect at KMeans.scala:283 The error message is ExecutorLostFailure (executor lost). The program recovers by automatically repartitioning the whole dataset (126G), which takes very long and seems to only delay the inevitable failure. Is there a recommended solution to this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Only-master-is-really-busy-at-KMeans-training-tp12411p12803.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark webUI - application details page
Have a look at the history server, looks like you have enabled history server on your local and not on the remote server. http://people.apache.org/~tdas/spark-1.0.0-rc11-docs/monitoring.html Thanks Best Regards On Tue, Aug 26, 2014 at 7:01 AM, SK skrishna...@gmail.com wrote: Hi, I am able to access the Application details web page from the master UI page when I run Spark in standalone mode on my local machine. However, I am not able to access it when I run Spark on our private cluster. The Spark master runs on one of the nodes in the cluster. I am able to access the spark master UI at spark://master-url:8080. It shows the listing of all the running and completed apps. When I click on the completed app, and access the Application details link, the link points to: master-url/app/?appId=app-idvalue When I view the page source to view the html source, the href portion is blank (). However, on my local machine, when I click the Application detail link for a completed app, it correctly points to master-url/history/app-id and when I view the page's html source, the href portion points to /history/app-id On the cluster, I have set spark.eventLog.enabled to true in $SPARK_HOME/conf/spark-defaults.conf on the master node as well as all the slave nodes. I am using spark 1.0.1 on the cluster. I am not sure why I am able to access the application details for completed apps when the app runs on my local machine but not for the apps that run on our cluster, although in both cases I am using spark 1.0.1 in standalone mode. Do I need to do any additional configuration to enable this history on the cluster? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12792.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Block input-* already exists on this machine; not re-adding it warnings
Answering my own question, it seems that the warnings are expected as explained by TD @ http://apache-spark-user-list.1001560.n3.nabble.com/streaming-questions-td3281.html . Here is what he wrote: Spark Streaming is designed to replicate the received data within the machines in a Spark cluster for fault-tolerance. However, when you are running in the local mode, since there is only one machine, the blocks of data arent able to replicate. This is expected and safe to ignore in local mode. I was indeed running it in local mode and hence it seems that I can safely ignore such warnings. Thanks, Aniket On 22 August 2014 15:54, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Hi everyone I back ported kinesis-asl to spark 1.0.2 and ran a quick test on my local machine. It seems to be working fine but I keep getting the following warnings. I am not sure what it means and weather it is something to worry about or not. 2014-08-22 15:53:43,803 [pool-1-thread-7] WARN o.apache.spark.storage.BlockManager - Block input-0-1408703023600 already exists on this machine; not re-adding it Thoughts? Thanks, Aniket
Re: How do you hit breakpoints using IntelliJ In functions used by an RDD
You need to run your app in localmode ( aka master=local[2]) to get it debugged locally. If you are running it on a cluster, then you can use the remote debugging feature. http://stackoverflow.com/questions/19128264/how-to-remote-debug-in-intellij-12-1-4 For remote debugging, you need to pass the following: -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=4000,suspend=n jvm options and configure your ide on that given port (4000) for remote debugging. Thanks Best Regards On Tue, Aug 26, 2014 at 1:32 AM, Sean Owen so...@cloudera.com wrote: PS from an offline exchange -- yes more is being called here, the rest is the standard WordCount example. The trick was to make sure the task executes locally, and calling setMaster(local) on SparkConf in the example code does that. That seems to work fine in IntelliJ for debugging this. On Mon, Aug 25, 2014 at 6:41 PM, Steve Lewis lordjoe2...@gmail.com wrote: That was not quite in English My Flatmap code is shown below I know the code is called since the answers are correct but would like to put a break point in dropNonLetters to make sure that code works properly I am running in the IntelliJ debugger but believe the code is executing on a Spark Worker. I am not sure what magic Intellij uses to hook up a debugger to a worker but hope it is possib;e public class WordsMapFunction implements FlatMapFunctionString, String { private static final Pattern SPACE = Pattern.compile( ); public IterableString call(String s) { String[] split = SPACE.split(s); for (int i = 0; i split.length; i++) { split[i] = regularizeString(split[i]); } return Arrays.asList(split); } public static String dropNonLetters(String s) { StringBuilder sb = new StringBuilder(); for (int i = 0; i s.length(); i++) { char c = s.charAt(i); if (Character.isLetter(c)) sb.append(c); } return sb.toString(); } public static String regularizeString(String inp) { inp = inp.trim(); inp = inp.toUpperCase(); return dropNonLetters(inp); } } On Mon, Aug 25, 2014 at 10:35 AM, Sean Owen so...@cloudera.com wrote: flatMap() is a transformation only. Calling it by itself does nothing, and it just describes the relationship between one RDD and another. You should see it swing into action if you invoke an action, like count(), on the words RDD. On Mon, Aug 25, 2014 at 6:32 PM, Steve Lewis lordjoe2...@gmail.com wrote: I was able to get JavaWordCount running with a local instance under IntelliJ. In order to do so I needed to use maven to package my code and call String[] jars = { /SparkExamples/target/word-count-examples_2.10-1.0.0.jar }; sparkConf.setJars(jars); After that the sample ran properly and in the debugger I could set break points in the main. However when I do something like JavaRDDString words = lines.flatMap( new WordsMapFunction()); where WordsMapFunction is a separate class like public static class WordsMapFunction implements FlatMapFunctionString, String { private static final Pattern SPACE = Pattern.compile( ); public IterableString call(String s) { String[] split = SPACE.split(s); for (int i = 0; i split.length; i++) { split[i] = toUpperCase(split[i]); } return Arrays.asList(split); } } Breakpoints set in WordsMapFunction are never hit. Most interesting functionality in the problems I am trying to solve if in the FlatMapFunction and the Function2 code and this is the functionality I will need to examine in more detail. Has anyone figured out how to configure a project to hit breakpoints in these functions?? -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Request for Help
Hi Not sure this is the right way of doing it, but if you can create a PairRDDFunction from that RDD then you can use the following piece of code to access the filenames from the RDD. PairRDDFunctionsK, V ds = .; //getting the name and path for the file name for(int i=0;ids.values().getPartitions().length;i++) { UnionPartition upp = (UnionPartition) ds.values().getPartitions()[i]; NewHadoopPartition npp = (NewHadoopPartition) upp.split(); System.out.println(File + npp.serializableHadoopSplit().value().toString()); } Thanks Best Regards On Tue, Aug 26, 2014 at 1:25 AM, yh18190 yh18...@gmail.com wrote: Hi Guys, I just want to know whether their is any way to determine which file is being handled by spark from a group of files input inside a directory.Suppose I have 1000 files which are given as input,I want to determine which file is being handled currently by spark program so that if any error creeps in at any point of time we can easily determine that particular file as faulty one. Please let me know your thoughts. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Request-for-Help-tp12776.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
My Post Related Query
I wrote a post on this forum but it shows the message This post has NOT been accepted by the mailing list yet. above my post. How long will it take to get it posted? Regards, Sandeep Vaid +91 - 09881710301
Re: Running Wordcount on large file stucks and throws OOM exception
Hello, it's me again. Now I've got an explanation for the behaviour. It seems that the driver memory is not large enough to hold the whole result set of saveAsTextFile In-Memory. And then OOM occures. I test it with a filter-step that removes KV-pairs with WordCount smaller 100,000. So now the job finished successfully. But is this the desired behaviour of Spark, that available driver memory limits the size of the result set? Or is my explanation wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Wordcount-on-large-file-stucks-and-throws-OOM-exception-tp12747p12809.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pair RDD
val node = textFile.map(line = { val fileds = line.split(\\s+) (fileds(1),fileds(2)) }) then you can manipulate node RDD with PairRDD function. 2014-08-26 12:55 GMT+08:00 Deep Pradhan pradhandeep1...@gmail.com: Hi, I have an input file of a graph in the format source_node dest_node When I use sc.textFile, it will change the entire text file into an RDD. How can I transform the file into key, value pair and then eventually into paired RDDs. Thank You
Re: Trying to run SparkSQL over Spark Streaming
Thanks for the reply. Ya it doesn't seem doable straight away. Someone suggested this /For each of your streams, first create an emty RDD that you register as a table, obtaining an empty table. For your example, let's say you call it allTeenagers. Then, for each of your queries, use SchemaRDD's insertInto method to add the result to that table: teenagers.insertInto(allTeenagers) If you do this with both your streams, creating two separate accumulation tables, you can then join them using a plain old SQL query. / So I was trying it but can't seem to use the insertInto method in the correct way. Something like: var p1 = Person(Hari,22) val rdd1 = sc.parallelize(Array(p1)) rdd1.registerAsTable(data) var p2 = Person(sagar, 22) var rdd2 = sc.parallelize(Array(p2)) rdd2.insertInto(data) is giving the error : java.lang.AssertionError: assertion failed: No plan for InsertIntoTable Map(), false Any thoughts? Thanks Hi again, On Tue, Aug 26, 2014 at 10:13 AM, Tobias Pfeiffer lt;tgp@gt; wrote: On Mon, Aug 25, 2014 at 7:11 PM, praveshjain1991 praveshjain1991@ wrote: If you want to issue an SQL statement on streaming data, you must have both the registerAsTable() and the sql() call *within* the foreachRDD(...) block, or -- as you experienced -- the table name will be unknown Since this is the case then is there any way to run join over data received from two different streams? Couldn't you do dstream1.join(dstream2).foreachRDD(...)? Ah, I guess you meant something like SELECT * FROM dstream1 JOIN dstream2 WHERE ...? I don't know if that is possible. Doesn't seem easy to me; I don't think that's doable with the current codebase... Tobias -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-run-SparkSQL-over-Spark-Streaming-tp12530p12812.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark - GraphX pregel like with global variables (accumulator / broadcast)
I actually tried without unpersisting, but given the performance I tryed to add these in order to free the memory. After your anwser I tried to remove them again, but without any change in the execution time... Looking at the web interface, I can see that the mapPartitions at GraphImpl.scala:184 step just after the foreach (so I guess computing messages, even if I would have bet on the innerjoin...) are taking ... 2s - 6s - 17s - 48s - 15m+ (sometimes ends with GC overhead limit exceeded) and have shuffle write of 9MB - 30MB - 73MB - 162MB - never completed. Is this still related to unpersisting ? I'm mostly a mathematician, and so I'm not well acquainted with the depth of Spark... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphX-pregel-like-with-global-variables-accumulator-broadcast-tp12742p12813.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Losing Executors on cluster with RDDs of 100GB
Hi, Plz give a try by changing the worker memory such that worker memoryexecutor memory Thanks Regards, Meethu M On Friday, 22 August 2014 5:18 PM, Yadid Ayzenberg ya...@media.mit.edu wrote: Hi all, I have a spark cluster of 30 machines, 16GB / 8 cores on each running in standalone mode. Previously my application was working well ( several RDDs the largest being around 50G). When I started processing larger amounts of data (RDDs of 100G) my app is losing executors. Im currently just loading them from a database, rePartitioning and persisting to disk (with replication x2) I have spark.executor.memory= 9G, memoryFraction = 0.5, spark.worker.timeout =120, spark.akka.askTimeout=30, spark.storage.blockManagerHeartBeatMs=3. I haven't change the default of my worker memory so its at 512m (should this be larger) ? I've been getting the following messages from my app: [error] o.a.s.s.TaskSchedulerImpl - Lost executor 3 on myserver1: worker lost [error] o.a.s.s.TaskSchedulerImpl - Lost executor 13 on myserver2: Unknown executor exit code (137) (died from signal 9?) [error] a.r.EndpointWriter - AssociationError [akka.tcp://spark@master:59406] - [akka.tcp://sparkExecutor@myserver2:32955]: Error [Association failed with [akka.tcp://sparkExecutor@myserver2:32955]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@myserver2.com:32955] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: myserver2/198.18.102.160:32955 ] [error] a.r.EndpointWriter - AssociationError [akka.tcp://spark@master:59406] - [akka.tcp://spark@myserver1:53855]: Error [Association failed with [akka.tcp://spark@myserver1:53855]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@myserver1:53855] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: myserver1/198.18.102.160:53855 ] The worker logs and executor logs do not contain errors. Any ideas what the problem is ? Yadid - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Printing the RDDs in SparkPageRank
println(parts(0)) does not solve the problem. It does not work On Mon, Aug 25, 2014 at 1:30 PM, Sean Owen so...@cloudera.com wrote: On Mon, Aug 25, 2014 at 7:18 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: When I add parts(0).collect().foreach(println) parts(1).collect().foreach(println), for printing parts, I get the following error not enough arguments for method collect: (pf: PartialFunction[Char,B])(implicit bf:scala.collection.generic.CanBuildFrom[String,B,That])That.Unspecified value parameter pf.parts(0).collect().foreach(println) val links = lines.map{ s = val parts = s.split(\\s+) (parts(0), parts(1)) /*I want to print this parts*/ }.distinct().groupByKey().cache() Within this code, you are working in a simple Scala function. parts is an Array[String]. parts(0) is a String. You can just println(parts(0)). You are not calling RDD.collect() there, but collect() on a String a sequence of Char. However note that this will print the String on the worker that executes this, not the driver. Maybe you want to print the result right after this map function? Then break this into two statements and print the result of the first. You already are doing that in your code. A good formula is actually take(10) rather than collect() in case the RDD is huge.
Key-Value in PairRDD
I have the following code *val nodes = lines.map(s ={val fields = s.split(\\s+) (fields(0),fields(1))}).distinct().groupByKey().cache()* and when I print out the nodes RDD I get the following *(4,ArrayBuffer(1))(2,ArrayBuffer(1))(3,ArrayBuffer(1))(1,ArrayBuffer(3, 2, 4))* Now, I want to print only the key part of the RDD and also the maximum value among the keys. How should I do that? Thank You
Re: Key-Value in PairRDD
I'd suggest first reading the scaladoc for RDD and PairRDDFunctions to familiarize yourself with all the operations available: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions You'll quickly find keys() and max(). On Tue, Aug 26, 2014 at 10:54 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: I have the following code val nodes = lines.map(s ={ val fields = s.split(\\s+) (fields(0),fields(1)) }).distinct().groupByKey().cache() and when I print out the nodes RDD I get the following (4,ArrayBuffer(1)) (2,ArrayBuffer(1)) (3,ArrayBuffer(1)) (1,ArrayBuffer(3, 2, 4)) Now, I want to print only the key part of the RDD and also the maximum value among the keys. How should I do that? Thank You - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Storage Handlers in Spark SQL
it seems he means to query RDBMS or cassandra using Spark SQL, multi data sources for spark SQL. i looked through the link he posted https://docs.wso2.com/display/BAM241/Creating+Hive+Queries+to+Analyze+Data#CreatingHiveQueriestoAnalyzeData-CreatingHivetablesforvariousdatasources using their storage handlers, users can create hive external table from c* table or RDBMS table (JDBC) so Niranda, maybe you can take a look at this API: https://issues.apache.org/jira/browse/SPARK-2179 and there is some doc in pull request pool: https://github.com/apache/spark/pull/1774 there is a similar implementation to your JDBC storage handlers in spark SQL, it could also be a sample of the Public API for DataTypes and Schema: https://github.com/apache/spark/pull/1612 (https://issues.apache.org/jira/browse/SPARK-2710) and, in some other userlist threads, i saw that, some kind of c* mapper is also in development by datastax? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Storage-Handlers-in-Spark-SQL-tp12780p12818.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark.default.parallelism bug?
Hi, consider the following code: import org.apache.spark.{SparkContext, SparkConf} object ParallelismBug extends App { var sConf = new SparkConf() .setMaster(spark://hostName:7077) // .setMaster(local[4]) .set(spark.default.parallelism, 7) // or without it val sc = new SparkContext(sConf) val rdd = sc.textFile(input/100) // val rdd = sc.parallelize(Array.range(1, 100)) val rdd2 = rdd.intersection(rdd) println(rdd: + rdd.partitions.size + rdd2: + rdd2.partitions.size) } Suppose that input/100 contains 100 files. In above configuration output is rdd: 100 rdd2: 7, which seems ok. when we don't set parallelism then output is rdd: 100 rdd2: 100, but according to https://spark.apache.org/docs/latest/configuration.html#execution-behavior it should be rdd: 100 rdd2: 2 (on my 1 core machine). But when rdd is defined using sc.parallelize results seems ok: rdd: 2 rdd2: 2. Moreover when master is local[4] and we set parallelism then result is rdd: 100 rdd2: 4 instead of rdd: 100 rdd2: 7. And when we don't set parallelism it behaves like with master spark://hostName:7077. Do I misunderstanding something, or is it a bug? Thanks, Grzegorz
Re: Spark Streaming Output to DB
Hello People, I'm using java spark streaming. I'm just wondering, Can I make simple jdbc connection in JavaDStream map() method? Or Do I need to create jdbc connection for each JavaPairDStream, after map task? Kindly give your thoughts. Cheers, Ravi Sharma
Prevent too many partitions
Hi, I have in my application many union operations. But union increases number of partitions of following RDDs. And performance on more partitions sometimes is very slow. Is there any cleaner way to prevent increasing number of partitions than adding coalesce(numPartitions) after each union? Thanks, Grzegorz
Re: Spark Streaming Output to DB
Yes, you can open a jdbc connection at the beginning of the map method then close this connection at the end of map() and in between you can use this connection. Thanks Best Regards On Tue, Aug 26, 2014 at 6:12 PM, Ravi Sharma raviprincesha...@gmail.com wrote: Hello People, I'm using java spark streaming. I'm just wondering, Can I make simple jdbc connection in JavaDStream map() method? Or Do I need to create jdbc connection for each JavaPairDStream, after map task? Kindly give your thoughts. Cheers, Ravi Sharma
Re: Low Level Kafka Consumer for Spark
Hi, As I understand, your problem is similar to this JIRA. https://issues.apache.org/jira/browse/SPARK-1647 The issue in this case, Kafka can not replay the message as offsets are already committed. Also I think existing KafkaUtils ( The Default High Level Kafka Consumer) also have this issue. Similar discussion is there in this thread also... http://apache-spark-user-list.1001560.n3.nabble.com/Data-loss-Spark-streaming-and-network-receiver-td12337.html As I am thinking, it is possible to tackle this in the consumer code I have written. If we can store the topic partition_id and consumed offset in ZK after every checkpoint , then after Spark recover from the fail over, the present PartitionManager code can start reading from last checkpointed offset ( instead last committed offset as it is doing now) ..In that case it can replay the data since last checkpoint. I will think over it .. Regards, Dibyendu On Mon, Aug 25, 2014 at 11:23 PM, RodrigoB rodrigo.boav...@aspect.com wrote: Hi Dibyendu, My colleague has taken a look at the spark kafka consumer github you have provided and started experimenting. We found that somehow when Spark has a failure after a data checkpoint, the expected re-computations correspondent to the metadata checkpoints are not recovered so we loose Kafka messages and RDD's computations in Spark. The impression is that this code is replacing quite a bit of Spark Kafka Streaming code where maybe (not sure) metadata checkpoints are done every batch interval. Was it on purpose to solely depend on the Kafka commit to recover data and recomputations between data checkpoints? If so, how to make this work? tnks Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Screencast doesn't show in Chrome on OS X
For the record, I'm using Chrome 36.0.1985.143 on 10.9.4 as well. Maybe it's a Chrome add-on I'm running? Anyway, as Matei pointed out, if I change the https to http, it works fine. On Tue, Aug 26, 2014 at 1:46 AM, Michael Hausenblas michael.hausenb...@gmail.com wrote: https://spark.apache.org/screencasts/1-first-steps-with-spark.html The embedded YouTube video shows up in Safari on OS X but not in Chrome. I’m using Chrome 36.0.1985.143 on MacOS 10.9.4 and it it works like a charm for me. Cheers, Michael -- Michael Hausenblas Ireland, Europe http://mhausenblas.info/ On 25 Aug 2014, at 21:55, Nick Chammas nicholas.cham...@gmail.com wrote: https://spark.apache.org/screencasts/1-first-steps-with-spark.html The embedded YouTube video shows up in Safari on OS X but not in Chrome. How come? Nick View this message in context: Spark Screencast doesn't show in Chrome on OS X Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Low Level Kafka Consumer for Spark
Hi Bharat, Thanks for your email. If the Kafka Reader worker process dies, it will be replaced by different machine, and it will start consuming from the offset where it left over ( for each partition). Same case can happen even if I tried to have individual Receiver for every partition. Regards, Dibyendu On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: I like this consumer for what it promises - better control over offset and recovery from failures. If I understand this right, it still uses single worker process to read from Kafka (one thread per partition) - is there a way to specify multiple worker processes (on different machines) to read from Kafka? Maybe one worker process for each partition? If there is no such option, what happens when the single machine hosting the Kafka Reader worker process dies and is replaced by a different machine (like in cloud)? Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Screencast doesn't show in Chrome on OS X
On Tue, Aug 26, 2014 at 10:28 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Maybe it's a Chrome add-on I'm running? Hmm, scratch that. Trying in incognito mode (which disables add-ons, I believe) also yields the same behavior. Nick
Re: Spark SQL Parser error
I have not tried it. But, I guess you need to add your credential in the s3 path. Or, can you copy the jar to your driver node and try again? On Sun, Aug 24, 2014 at 9:35 AM, S Malligarjunan smalligarju...@yahoo.com wrote: Hello Yin, Additional note: In ./bin/spark-shell --jars s3n:/mybucket/myudf.jar I got the following message in console. Waring skipped external jar.. Thanks and Regards, Sankar S. On , S Malligarjunan smalligarju...@yahoo.com wrote: Hello Yin, I have tried use sc.addJar and hiveContext.sparkContext.addJar and ./bin/spark-shell --jars option, In all three option when I try to create temporary funtion i get the classNotFoundException. What would be the issue here? Thanks and Regards, Sankar S. On Saturday, 23 August 2014, 0:53, Yin Huai huaiyin@gmail.com wrote: Hello Sankar, Add JAR in SQL is not supported at the moment. We are working on it ( https://issues.apache.org/jira/browse/SPARK-2219). For now, can you try SparkContext.addJar or using --jars your-jar to launch spark shell? Thanks, Yin On Fri, Aug 22, 2014 at 2:01 PM, S Malligarjunan smalligarju...@yahoo.com wrote: Hello Yin/All. @Yin - Thanks for helping. I solved the sql parser error. I am getting the following exception now scala hiveContext.hql(ADD JAR s3n://hadoop.anonymous.com/lib/myudf.jar ); warning: there were 1 deprecation warning(s); re-run with -deprecation for details 14/08/22 17:58:55 INFO SessionState: converting to local s3n:// hadoop.anonymous.com/lib/myudf.jar 14/08/22 17:58:56 ERROR SessionState: Unable to register /tmp/3d273a4c-0494-4bec-80fe-86aa56f11684_resources/myudf.jar Exception: org.apache.spark.repl.SparkIMain$TranslatingClassLoader cannot be cast to java.net.URLClassLoader java.lang.ClassCastException: org.apache.spark.repl.SparkIMain$TranslatingClassLoader cannot be cast to java.net.URLClassLoader at org.apache.hadoop.hive.ql.exec.Utilities.addToClassPath(Utilities.java:1680) Thanks and Regards, Sankar S. On Friday, 22 August 2014, 22:53, S Malligarjunan smalligarju...@yahoo.com.INVALID wrote: Hello Yin, Forgot to mention one thing, the same query works fine in Hive and Shark.. Thanks and Regards, Sankar S. On , S Malligarjunan smalligarju...@yahoo.com wrote: Hello Yin, I have tried the create external table command as well. I get the same error. Please help me to find the root cause. Thanks and Regards, Sankar S. On Friday, 22 August 2014, 22:43, Yin Huai huaiyin@gmail.com wrote: Hi Sankar, You need to create an external table in order to specify the location of data (i.e. using CREATE EXTERNAL TABLE user1 LOCATION). You can take a look at this page https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/TruncateTable for reference. Thanks, Yin On Thu, Aug 21, 2014 at 11:12 PM, S Malligarjunan smalligarju...@yahoo.com.invalid wrote: Hello All, When i execute the following query val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) CREATE TABLE user1 (time string, id string, u_id string, c_ip string, user_agent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION 's3n:// hadoop.anonymous.com/output/qa/cnv_px_ip_gnc/ds=2014-06-14/') I am getting the following error org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: CREATE TABLE user1 (time string, id string, u_id string, c_ip string, user_agent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY ' ' STORED AS TEXTFILE LOCATION 's3n:// hadoop.anonymous.com/output/qa/cnv_px_ip_gnc/ds=2014-06-14/') at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:215) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:98) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:102) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC.init(console:31) at $iwC.init(console:33) at init(console:35) Kindly let me know what could be the issue here. I have cloned spark from github. Using Hadoop 1.0.3 Thanks and Regards, Sankar S.
What is a Block Manager?
I'm curious not only about what they do, but what their relationship is to the rest of the system. I find that I get listener events for n block managers added where n is also the number of workers I have available to the application. Is this a stable constant? Also, are there ways to determine at runtime how many workers I have and where they are? Thanks, Victor
Re: Spark webUI - application details page
I have already tried setting the history server and accessing it on master-url:18080 as per the link. But the page does not list any completed applications. As I mentioned in my previous mail, I am running Spark in standalone mode on the cluster (as well as on my local machine). According to the link, it appears that the history server is required only in mesos or yarn mode, not in standalone mode. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12834.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: unable to instantiate HiveMetaStoreClient on LocalHiveContext
Hello Du, Can you check if there is a dir metastore in the place you launching your program. If so, can you delete it and try again? Also, can you try HiveContext? LocalHiveContext is deprecated. Thanks, Yin On Mon, Aug 25, 2014 at 6:33 PM, Du Li l...@yahoo-inc.com.invalid wrote: Hi, I created an instance of LocalHiveContext and attempted to create a database. However, it failed with message org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient”. My code is as follows. Similar code worked on spark-shell and also bin/run-example org.apache.spark.examples.sql.hive.HiveFromSpark. import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ import org.apache.spark.sql.hive.LocalHiveContext val conf = new SparkConf(false).setMaster(local).setAppName(test data exchange with Hive) conf.set(spark.driver.host, localhost) val sc = new SparkContext(conf) val hc = new LocalHiveContext(sc) hc.hql(“create database if not exists testdb) The exception was thrown out of the hql call. Did I miss any configuration? Thanks, Du
Submit to the Powered By Spark Page!
Hi All, I want to invite users to submit to the Spark Powered By page. This page is a great way for people to learn about Spark use cases. Since Spark activity has increased a lot in the higher level libraries and people often ask who uses each one, we'll include information about which components each organization uses as well. If you are interested, simply respond to this e-mail (or e-mail me off-list) with: 1) Organization name 2) URL 3) Which Spark components you use: Core, SQL, Streaming, MLlib, GraphX 4) A 1-2 sentence description of your use case. I'll post any new entries here: https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark - Patrick
Spark Streaming - Small file in HDFS
Hi People, I'm using java kafka spark streaming and saving the result file into hdfs. As per my understanding, spark streaming write every processed message or event to hdfs file. Reason to creating one file per message or event could be to ensure fault tolerance. Is there any way spark handle this small file problem or Do I need to append small files into bigger file and then insert into hdfs? Appreciate your time and suggestions.
Re: SPARK Hive Context UDF Class Not Found Exception,
Hello Michel, I have executed git pull now, As per pom, version entry it is 1.1.0-SNAPSHOT. Thanks and Regards, Sankar S. On Tuesday, 26 August 2014, 1:00, Michael Armbrust mich...@databricks.com wrote: Which version of Spark SQL are you using? Several issues with custom hive UDFs have been fixed in 1.1. On Mon, Aug 25, 2014 at 9:57 AM, S Malligarjunan smalligarju...@yahoo.com.invalid wrote: Hello All, I have added a jar from S3 instance into classpath, i have tried following options 1. sc.addJar(s3n://mybucket/lib/myUDF.jar) 2. hiveContext.sparkContext.addJar(s3n://mybucket/lib/myUDF.jar) 3. ./bin/spark-shell --jars s3n://mybucket/lib/myUDF.jar I am getting ClassNotException when trying to create a temporary function. What would be the issue here? Thanks and Regards, Sankar S.
Re: Only master is really busy at KMeans training
How many partitions now? Btw, which Spark version are you using? I checked your code and I don't understand why you want to broadcast vectors2, which is an RDD. var vectors2 = vectors.repartition(1000).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) var broadcastVector = sc.broadcast(vectors2) What is the total memory of your cluster? Does the dataset fit into memory? If not, you can try turning on `spark.rdd.compress`. The whole dataset is not small. -Xiangrui On Mon, Aug 25, 2014 at 11:46 PM, durin m...@simon-schaefer.net wrote: With a lower number of partitions, I keep losing executors during collect at KMeans.scala:283 The error message is ExecutorLostFailure (executor lost). The program recovers by automatically repartitioning the whole dataset (126G), which takes very long and seems to only delay the inevitable failure. Is there a recommended solution to this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Only-master-is-really-busy-at-KMeans-training-tp12411p12803.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Only master is really busy at KMeans training
Right now, I have issues even at a far earlier point. I'm fetching data from a registerd table via var texts = ctx.sql(SELECT text FROM tweetTrainTable LIMIT 2000).map(_.head.toString).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) //persisted because it's used again later var dict = texts.flatMap(_.split( ).map(_.toLowerCase())).repartition(80) //80=2*num_cpu var count = dict.count.toInt As far as I can see, it's the repartitioning that is causingthe problems. However, without that, I have only one partition for further RDD operations on dict, so it seems to be necessary. The errors given are 14/08/26 10:43:52 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.1 (TID 2300, idp11.foo.bar): java.lang.OutOfMemoryError: Requested array size exceeds VM limit java.util.Arrays.copyOf(Arrays.java:3230) java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) ... Then the RDD operations start again, but later I will get 14/08/26 10:47:14 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.2 (TID 2655, idp41.foo.bar: java.lang.NullPointerException: $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:26) $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:26) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236) and another java.lang.OutOfMemoryError. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Only-master-is-really-busy-at-KMeans-training-tp12411p12842.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Kinesis receiver spark streaming partition
We are exploring using Kinesis and spark streaming together. I took at a look at the kinesis receiver code in 1.1.0. I have a question regarding kinesis partition spark streaming partition. It seems to be pretty difficult to align these partitions. Kinesis partitions a stream of data into shards, if we follow the example, we will have multiple kinesis receivers reading from the same stream in spark streaming. It seems like kinesis workers will coordinate among themselves and assign shards to themselves dynamically. For a particular shard, it can be consumed by different kinesis workers (thus different spark workers) dynamically (not at the same time). Blocks are generated based on time intervals, RDD are created based on blocks. RDDs are partitioned based on blocks. At the end, the data for a given shard will be spread into multiple blocks (possible located on different spark worker nodes). We will probably need to group these data again for a given shard and shuffle data around to achieve the same partition we had in Kinesis. Is there a better way to achieve this to avoid data reshuffling? Thanks, Wei
Re: Spark Screencast doesn't show in Chrome on OS X
It should be fixed now. Maybe you have a cached version of the page in your browser. Open DevTools (cmd-shift-I), press the gear icon, and check disable cache while devtools open, then refresh the page to refresh without cache. Matei On August 26, 2014 at 7:31:18 AM, Nicholas Chammas (nicholas.cham...@gmail.com) wrote: On Tue, Aug 26, 2014 at 10:28 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Maybe it's a Chrome add-on I'm running? Hmm, scratch that. Trying in incognito mode (which disables add-ons, I believe) also yields the same behavior. Nick
Re: Spark Screencast doesn't show in Chrome on OS X
Confirmed. Works now. Thanks Matei. (BTW, on OS X Command + Shift + R also refreshes the page without cache.) On Tue, Aug 26, 2014 at 3:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: It should be fixed now. Maybe you have a cached version of the page in your browser. Open DevTools (cmd-shift-I), press the gear icon, and check disable cache while devtools open, then refresh the page to refresh without cache. Matei On August 26, 2014 at 7:31:18 AM, Nicholas Chammas ( nicholas.cham...@gmail.com) wrote: On Tue, Aug 26, 2014 at 10:28 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Maybe it's a Chrome add-on I'm running? Hmm, scratch that. Trying in incognito mode (which disables add-ons, I believe) also yields the same behavior. Nick
OutofMemoryError when generating output
Hi, I have the following piece of code that I am running on a cluster with 10 nodes with 2GB memory per node. The tasks seem to complete, but at the point where it is generating output (saveAsTextFile), the program freezes after some time and reports an out of memory error (error transcript attached below). I also tried using collect() and printing the output to console instead of a file, but got the same error. The program reads some logs for a month and extracts the number of unique users during the month. The reduced output is not very large, so not sure why the memory error occurs. I would appreciate any help in fixing this memory error to get the output. Thanks. def main (args: Array[String]) { val conf = new SparkConf().setAppName(App) val sc = new SparkContext(conf) // get the number of users per month val user_time = sc.union(sc.textFile(baseFile)) .map(line = { val fields = line.split(\t) (fields(11), fields(6)) }) // extract (month, user_id) .groupByKey // group by month as the key .map(g= (g._1, g._2.toSet.size)) // get the unique id count per month // .collect() // user_time.foreach(f = println(f)) user_time.map(f = %s, %s.format(f._1, f._2)).saveAsTextFile(app_output) sc.stop() } 14/08/26 15:21:15 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:121) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:107) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:106) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutofMemoryError-when-generating-output-tp12847.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.1. doesn't work with hive context
Hello all, I have just checked out branch-1.1 and executed below command ./bin/spark-shell --driver-memory 1G val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) hiveContext.hql(LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src) // Queries are expressed in HiveQL hiveContext.hql(FROM src SELECT key, value).collect().foreach(println) I am getting the following exception Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found. at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135) at org.apache.hadoop.io.compress.CompressionCodecFactory.init(CompressionCodecFactory.java:175) at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45) ... 72 more Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found Thanks and Regards, Sankar S.
Re: disable log4j for spark-shell
If someone doesn't have the access to do that is there any easy to specify a different properties file to be used? Patrick Wendell wrote If you want to customize the logging behavior - the simplest way is to copy conf/log4j.properties.tempate to conf/log4j.properties. Then you can go and modify the log level in there. The spark shells should pick this up. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/disable-log4j-for-spark-shell-tp11278p12850.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OutofMemoryError when generating output
Hi, The error doesn't occur during saveAsTextFile but rather during the groupByKey as far as I can tell. We strongly urge users to not use groupByKey if they don't have to. What I would suggest is the following work-around: sc.textFile(baseFile)).map { line = val fields = line.split(\t) (fields(11), fields(6)) // extract (month, user_id) }.distinct().countByKey() instead Best, Burak - Original Message - From: SK skrishna...@gmail.com To: u...@spark.incubator.apache.org Sent: Tuesday, August 26, 2014 12:38:00 PM Subject: OutofMemoryError when generating output Hi, I have the following piece of code that I am running on a cluster with 10 nodes with 2GB memory per node. The tasks seem to complete, but at the point where it is generating output (saveAsTextFile), the program freezes after some time and reports an out of memory error (error transcript attached below). I also tried using collect() and printing the output to console instead of a file, but got the same error. The program reads some logs for a month and extracts the number of unique users during the month. The reduced output is not very large, so not sure why the memory error occurs. I would appreciate any help in fixing this memory error to get the output. Thanks. def main (args: Array[String]) { val conf = new SparkConf().setAppName(App) val sc = new SparkContext(conf) // get the number of users per month val user_time = sc.union(sc.textFile(baseFile)) .map(line = { val fields = line.split(\t) (fields(11), fields(6)) }) // extract (month, user_id) .groupByKey // group by month as the key .map(g= (g._1, g._2.toSet.size)) // get the unique id count per month // .collect() // user_time.foreach(f = println(f)) user_time.map(f = %s, %s.format(f._1, f._2)).saveAsTextFile(app_output) sc.stop() } 14/08/26 15:21:15 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:121) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:107) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:106) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutofMemoryError-when-generating-output-tp12847.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile hangs with hdfs
Hi David, Your job is probably hanging on the groupByKey process. Probably GC is kicking in and the process starts to hang or the data is unbalanced and you end up with stragglers (Once GC kicks in you'll start to get the connection errors you shared). If you don't care about the list of values itself, but the count of it (that appears to be what you're trying to save, correct me if I'm wrong), then I would suggest using `countByKey()` directly on `JavaPairRDDString, AnalyticsLogFlyweight partitioned`. Best, Burak - Original Message - From: David david.b...@gmail.com To: user u...@spark.incubator.apache.org Sent: Tuesday, August 19, 2014 1:44:18 PM Subject: saveAsTextFile hangs with hdfs I have a simple spark job that seems to hang when saving to hdfs. When looking at the spark web ui, the job reached 97 of 100 tasks completed. I need some help determining why the job appears to hang. The job hangs on the saveAsTextFile() call. https://www.dropbox.com/s/fdp7ck91hhm9w68/Screenshot%202014-08-19%2010.53.24.png The job is pretty simple: JavaRDDString analyticsLogs = context .textFile(Joiner.on(,).join(hdfs.glob(/spark-dfs, .*\\.log$)), 4); JavaRDDAnalyticsLogFlyweight flyweights = analyticsLogs .map(line - { try { AnalyticsLog log = GSON.fromJson(line, AnalyticsLog.class); AnalyticsLogFlyweight flyweight = new AnalyticsLogFlyweight(); flyweight.ipAddress = log.getIpAddress(); flyweight.time = log.getTime(); flyweight.trackingId = log.getTrackingId(); return flyweight; } catch (Exception e) { LOG.error(error parsing json, e); return null; } }); JavaRDDAnalyticsLogFlyweight filtered = flyweights .filter(log - log != null); JavaPairRDDString, AnalyticsLogFlyweight partitioned = filtered .mapToPair((AnalyticsLogFlyweight log) - new Tuple2(log.trackingId, log)) .partitionBy(new HashPartitioner(100)).cache(); OrderingAnalyticsLogFlyweight ordering = Ordering.natural().nullsFirst().onResultOf(new FunctionAnalyticsLogFlyweight, Long() { public Long apply(AnalyticsLogFlyweight log) { return log.time; } }); JavaPairRDDString, IterableAnalyticsLogFlyweight stringIterableJavaPairRDD = partitioned.groupByKey(); JavaPairRDDString, Integer stringIntegerJavaPairRDD = stringIterableJavaPairRDD.mapToPair((log) - { ListAnalyticsLogFlyweight sorted = Lists.newArrayList(log._2()); sorted.forEach(l - LOG.info(sorted {}, l)); return new Tuple2(log._1(), sorted.size()); }); String outputPath = /summarized/groupedByTrackingId4; hdfs.rm(outputPath, true); stringIntegerJavaPairRDD.saveAsTextFile(String.format(%s/%s, hdfs.getUrl(), outputPath)); Thanks in advance, David
Re: Out of memory on large RDDs
Hi Grega, Did you ever get this figured out? I'm observing the same issue in Spark 1.0.2. For me it was after 1.5hr of a large .distinct call, followed by a .saveAsTextFile() 14/08/26 20:57:43 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 18500 14/08/26 20:57:43 INFO executor.Executor: Running task ID 18500 14/08/26 20:57:43 INFO storage.BlockManager: Found block broadcast_0 locally 14/08/26 20:57:43 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them 14/08/26 20:58:13 ERROR executor.Executor: Exception in task ID 18491 org.apache.spark.SparkException: Error communicating with MapOutputTracker at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:108) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:155) at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:105) ... 23 more On Tue, Mar 11, 2014 at 3:07 PM, Grega Kespret gr...@celtra.com wrote: Your input data read as RDD may be causing OOM, so thats where you can use different memory configuration. We are not getting any OOM exceptions, just akka future timeouts in mapoutputtracker and unsuccessful get of shuffle outputs, therefore refetching them. What is the industry practice when going about debugging such errors? Questions: - why are mapoutputtrackers timing out? ( and how to debug this properly?) - what is the task/purpose of mapoutputtracker? - how to check per-task objects size? Thanks, Grega On 11 Mar 2014, at 18:43, Mayur Rustagi mayur.rust...@gmail.com wrote: Shuffle data is always stored on disk, its unlikely to cause OOM. Your input data read as RDD may be causing OOM, so thats where you can use different memory configuration. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Mar 11, 2014 at 9:20 AM, sparrow do...@celtra.com wrote: I don't understand how exactly will that help. There are no persisted RDD's in storage. Our input data is ~ 100GB, but output of the flatMap is ~40Mb. The small RDD is then persisted. Memory configuration should not affect shuffle data if I understand you correctly? On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=2537i=0 wrote: Shuffle data is not kept in memory. Did you try additional memory configurations( https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence ) Mayur Rustagi Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 target=_blank+1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec [hidden email]
Re: Does HiveContext support Parquet?
Hi Silvio, I re-downloaded hive-0.12-bin and reset the related path in spark-env.sh. However, I still got some error. Do you happen to know any step I did wrong? Thank you! My detailed step is as follows: #enter spark-shell (successful) /bin/spark-shell --master spark://S4:7077 --jars /home/hduser/parquet-hive-bundle-1.5.0.jar #import related hiveContext (successful) ... # create parquet table: hql(CREATE TABLE parquet_test (id int, str string, mp MAPSTRING,STRING, lst ARRAYSTRING, strct STRUCTA:STRING,B:STRING) PARTITIONED BY (part string) ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat') get error: 14/08/26 21:59:20 ERROR exec.DDLTask: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.getObjectInspector(ArrayWritableObjectInspector.java:77) at org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector.init(ArrayWritableObjectInspector.java:59) at org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.initialize(ParquetHiveSerDe.java:113) at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:218) at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:272) at org.apache.hadoop.hive.ql.metadata.Table.getDeserializer(Table.java:265) at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:597) at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:576) at org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:3661) at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:252) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:151) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:65) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1414) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1192) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1020) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:186) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:160) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:247) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:85) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90) at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18) at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $line34.$read$$iwC$$iwC$$iwC$$iwC.init(console:25) at $line34.$read$$iwC$$iwC$$iwC.init(console:27) at $line34.$read$$iwC$$iwC.init(console:29) at $line34.$read$$iwC.init(console:31) at $line34.$read.init(console:33) at $line34.$read$.init(console:37) at $line34.$read$.clinit(console) at $line34.$eval$.init(console:7) at $line34.$eval$.clinit(console) at $line34.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884) at
Re: countByWindow save the count ?
Thanks. I''m just confused on the syntax, I'm not sure which variables or where the value of the count is stored so that I can save it. Any examples or tips? On Mon, Aug 25, 2014 at 9:49 PM, Daniil Osipov daniil.osi...@shazam.com wrote: You could try to use foreachRDD on the result of countByWindow with a function that performs the save operation. On Fri, Aug 22, 2014 at 1:58 AM, Josh J joshjd...@gmail.com wrote: Hi, Hopefully a simple question. Though is there an example of where to save the output of countByWindow ? I would like to save the results to external storage (kafka or redis). The examples show only stream.print() Thanks, Josh
Re: Parsing Json object definition spanning multiple lines
i've seen this done using mapPartitions() where each partition represents a single, multi-line json file. you can rip through each partition (json file) and parse the json doc as a whole. this assumes you use sc.textFile(path/*.json) or equivalent to load in multiple files at once. each json file will be a partition. not sure if this satisfies your use case, but might be a good starting point. -chris On Mon, Jul 14, 2014 at 2:55 PM, SK skrishna...@gmail.com wrote: Hi, I have a json file where the definition of each object spans multiple lines. An example of one object definition appears below. { name: 16287e9cdf, width: 500, height: 325, width: 1024, height: 665, obj: [ { x: 395.08, y: 82.09, w: 185.48677, h: 185.48677, min: 50, max: 59, attr1: 2, attr2: 68, attr3: 8 }, { x: 519.1, y: 225.8, w: 170, h: 171, min: 20, max: 29, attr1: 7, attr2: 93, attr3: 10 } ] } I used the following Spark code to parse the file. However, the parsing is failing because I think it expects one Json object definition per line. I can try to preprocess the input file to remove the new lines, but I would like to know if it is possible to parse a Json object definition that spans multiple lines, directly in Spark. val inp = sc.textFile(args(0)) val res = inp.map(line = { parse(line) }) .map(json = { implicit lazy val formats = org.json4s.DefaultFormats val image = (json \ name).extract[String] } ) Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-Json-object-definition-spanning-multiple-lines-tp9659.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark-Streaming collect/take functionality.
good suggestion, td. and i believe the optimization that jon.burns is referring to - from the big data mini course - is a step earlier: the sorting mechanism that produces sortedCounts. you can use mapPartitions() to get a top k locally on each partition, then shuffle only (k * # of partitions) elements to the driver for sorting - versus shuffling the whole dataset from all partitions. network IO saving technique. On Tue, Jul 15, 2014 at 9:41 AM, jon.burns jon.bu...@uleth.ca wrote: It works perfect, thanks!. I feel like I should have figured that out, I'll chalk it up to inexperience with Scala. Thanks again. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-collect-take-functionality-tp9670p9772.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Specifying classpath
Hello, I'm using the following version of Spark - 1.0.0+cdh5.1.0+41 (1.cdh5.1.0.p0.27). I've tried to specify the libraries Spark uses using the following ways - 1) Adding it to spark context 2) Specifying the jar path in a) spark.executor.extraClassPath b) spark.executor.extraLibraryPath 3) Copying the libraries to spark/lib 4) Specifying the path in SPARK_CLASSPATH, even SPARK_LIBRARY_PATH 5) Passing as --jars argument but the spark application is not able to pick up the libraries although, I can see the message SparkContext: Added JAR file. I get a NoClassDef found error. The only way I've been to able to make it work right now is by merging my application jar with all the library jars. What might be going on? I need it right now to specify hbase-protocol-0.98.1-cdh5.1.0.jar in SPARK_CLASSPATH as mentioned here https://issues.apache.org/jira/browse/HBASE-10877. I'm using spark-submit to submit the job Thanks Ashish
Upgrading 1.0.0 to 1.0.2
I wanted to make sure that there's full compatibility between minor releases. I have a project that has a dependency on spark-core so that it can be a driver program and that I can test locally. However, when connecting to a cluster you don't necessarily know what version you're connecting to. Is a 1.0.0 cluster binary compatible with a 1.0.2 driver program? Is a 1.0.0 driver program binary compatible with a 1.0.2 cluster?
Re: Spark - GraphX pregel like with global variables (accumulator / broadcast)
At 2014-08-26 01:20:09 -0700, BertrandR bertrand.rondepierre...@gmail.com wrote: I actually tried without unpersisting, but given the performance I tryed to add these in order to free the memory. After your anwser I tried to remove them again, but without any change in the execution time... This is probably a related issue: in Spark you have to explicitly cache any dataset that you use more than once. Otherwise it will be recomputed each time it's used, which can cause an exponential slowdown for certain dependency structures. To be safe, you could start by caching g, msg, and newVerts every time they are set. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Upgrading 1.0.0 to 1.0.2
Is this a standalone mode cluster? We don't currently make this guarantee, though it will likely work in 1.0.0 to 1.0.2. The problem though is that the standalone mode grabs the executors' version of Spark code from what's installed on the cluster, while your driver might be built against another version. On YARN and Mesos, you can more easily mix different versions of Spark, since each application ships its own Spark JAR (or references one from a URL), and this is used for both the driver and executors. Matei On August 26, 2014 at 6:10:57 PM, Victor Tso-Guillen (v...@paxata.com) wrote: I wanted to make sure that there's full compatibility between minor releases. I have a project that has a dependency on spark-core so that it can be a driver program and that I can test locally. However, when connecting to a cluster you don't necessarily know what version you're connecting to. Is a 1.0.0 cluster binary compatible with a 1.0.2 driver program? Is a 1.0.0 driver program binary compatible with a 1.0.2 cluster?
Re: Parsing Json object definition spanning multiple lines
You can use sc.wholeTextFiles to read each file as a complete String, though it requires each file to be small enough for one task to process. On August 26, 2014 at 4:01:45 PM, Chris Fregly (ch...@fregly.com) wrote: i've seen this done using mapPartitions() where each partition represents a single, multi-line json file. you can rip through each partition (json file) and parse the json doc as a whole. this assumes you use sc.textFile(path/*.json) or equivalent to load in multiple files at once. each json file will be a partition. not sure if this satisfies your use case, but might be a good starting point. -chris On Mon, Jul 14, 2014 at 2:55 PM, SK skrishna...@gmail.com wrote: Hi, I have a json file where the definition of each object spans multiple lines. An example of one object definition appears below. { name: 16287e9cdf, width: 500, height: 325, width: 1024, height: 665, obj: [ { x: 395.08, y: 82.09, w: 185.48677, h: 185.48677, min: 50, max: 59, attr1: 2, attr2: 68, attr3: 8 }, { x: 519.1, y: 225.8, w: 170, h: 171, min: 20, max: 29, attr1: 7, attr2: 93, attr3: 10 } ] } I used the following Spark code to parse the file. However, the parsing is failing because I think it expects one Json object definition per line. I can try to preprocess the input file to remove the new lines, but I would like to know if it is possible to parse a Json object definition that spans multiple lines, directly in Spark. val inp = sc.textFile(args(0)) val res = inp.map(line = { parse(line) }) .map(json = { implicit lazy val formats = org.json4s.DefaultFormats val image = (json \ name).extract[String] } ) Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-Json-object-definition-spanning-multiple-lines-tp9659.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Upgrading 1.0.0 to 1.0.2
Yes, we are standalone right now. Do you have literature why one would want to consider Mesos or YARN for Spark deployments? Sounds like I should try upgrading my project and seeing if everything compiles without modification. Then I can connect to an existing 1.0.0 cluster and see what what happens... Thanks, Matei :) On Tue, Aug 26, 2014 at 6:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Is this a standalone mode cluster? We don't currently make this guarantee, though it will likely work in 1.0.0 to 1.0.2. The problem though is that the standalone mode grabs the executors' version of Spark code from what's installed on the cluster, while your driver might be built against another version. On YARN and Mesos, you can more easily mix different versions of Spark, since each application ships its own Spark JAR (or references one from a URL), and this is used for both the driver and executors. Matei On August 26, 2014 at 6:10:57 PM, Victor Tso-Guillen (v...@paxata.com) wrote: I wanted to make sure that there's full compatibility between minor releases. I have a project that has a dependency on spark-core so that it can be a driver program and that I can test locally. However, when connecting to a cluster you don't necessarily know what version you're connecting to. Is a 1.0.0 cluster binary compatible with a 1.0.2 driver program? Is a 1.0.0 driver program binary compatible with a 1.0.2 cluster?
CUDA in spark, especially in MLlib?
Hi I am trying to find a CUDA library in Scala, to see if some matrix manipulation in MLlib can be sped up. I googled a few but found no active projects on Scala+CUDA. Python is supported by CUDA though. Any suggestion on whether this idea makes any sense? Best regards, Wei
RE: What is a Block Manager?
Basically, a Block Manager manages the storage for most of the data in spark, name a few: block that represent a cached RDD partition, intermediate shuffle data, broadcast data etc. it is per executor, while in standalone mode, normally, you have one executor per worker. You don't control how many worker you have at runtime, but you can somehow manage how many executors your application will launch Check different running mode's documentation for details ( but control where? Hardly, yarn mode did some works based on data locality, but this is done by framework not user program). Best Regards, Raymond Liu From: Victor Tso-Guillen [mailto:v...@paxata.com] Sent: Tuesday, August 26, 2014 11:42 PM To: user@spark.apache.org Subject: What is a Block Manager? I'm curious not only about what they do, but what their relationship is to the rest of the system. I find that I get listener events for n block managers added where n is also the number of workers I have available to the application. Is this a stable constant? Also, are there ways to determine at runtime how many workers I have and where they are? Thanks, Victor - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CUDA in spark, especially in MLlib?
You should try to find a Java-based library, then you can call it from Scala. Matei On August 26, 2014 at 6:58:11 PM, Wei Tan (w...@us.ibm.com) wrote: Hi I am trying to find a CUDA library in Scala, to see if some matrix manipulation in MLlib can be sped up. I googled a few but found no active projects on Scala+CUDA. Python is supported by CUDA though. Any suggestion on whether this idea makes any sense? Best regards, Wei
Re: Upgrading 1.0.0 to 1.0.2
Things will definitely compile, and apps compiled on 1.0.0 should even be able to link against 1.0.2 without recompiling. The only problem is if you run your driver with 1.0.0 on its classpath, but the cluster has 1.0.2 in executors. For Mesos and YARN vs standalone, the difference is that they just have more features, at the expense of more complicated setup. For example, they have richer support for cross-application sharing (see https://spark.apache.org/docs/latest/job-scheduling.html), and the ability to run non-Spark applications on the same cluster. Matei On August 26, 2014 at 6:53:33 PM, Victor Tso-Guillen (v...@paxata.com) wrote: Yes, we are standalone right now. Do you have literature why one would want to consider Mesos or YARN for Spark deployments? Sounds like I should try upgrading my project and seeing if everything compiles without modification. Then I can connect to an existing 1.0.0 cluster and see what what happens... Thanks, Matei :) On Tue, Aug 26, 2014 at 6:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Is this a standalone mode cluster? We don't currently make this guarantee, though it will likely work in 1.0.0 to 1.0.2. The problem though is that the standalone mode grabs the executors' version of Spark code from what's installed on the cluster, while your driver might be built against another version. On YARN and Mesos, you can more easily mix different versions of Spark, since each application ships its own Spark JAR (or references one from a URL), and this is used for both the driver and executors. Matei On August 26, 2014 at 6:10:57 PM, Victor Tso-Guillen (v...@paxata.com) wrote: I wanted to make sure that there's full compatibility between minor releases. I have a project that has a dependency on spark-core so that it can be a driver program and that I can test locally. However, when connecting to a cluster you don't necessarily know what version you're connecting to. Is a 1.0.0 cluster binary compatible with a 1.0.2 driver program? Is a 1.0.0 driver program binary compatible with a 1.0.2 cluster?
Ask for help, how to integrate Sparkstreaming and IBM MQ
hi, dear Now I am working on a project in below scenario. We will use Sparkingstreaming to receive data from IBM MQ, I checked the API document of streaming, it's only support ZeroMQ, Kafka, etc. I have some questions: 1. we can use MQTT protocol to get data in this scenario, right? any other way to connect to IBM MQ? 2. why sparkingstreaming does not support common protocol like JMS? Thanks in advance, appropriate your help! 35597...@qq.com
Re: Upgrading 1.0.0 to 1.0.2
Hi, Victor, the issue for you to have different version in driver and cluster is that you the master will shutdown your application due to the inconsistent SerialVersionID in ExecutorState Best, -- Nan Zhu On Tuesday, August 26, 2014 at 10:10 PM, Matei Zaharia wrote: Things will definitely compile, and apps compiled on 1.0.0 should even be able to link against 1.0.2 without recompiling. The only problem is if you run your driver with 1.0.0 on its classpath, but the cluster has 1.0.2 in executors. For Mesos and YARN vs standalone, the difference is that they just have more features, at the expense of more complicated setup. For example, they have richer support for cross-application sharing (see https://spark.apache.org/docs/latest/job-scheduling.html), and the ability to run non-Spark applications on the same cluster. Matei On August 26, 2014 at 6:53:33 PM, Victor Tso-Guillen (v...@paxata.com (mailto:v...@paxata.com)) wrote: Yes, we are standalone right now. Do you have literature why one would want to consider Mesos or YARN for Spark deployments? Sounds like I should try upgrading my project and seeing if everything compiles without modification. Then I can connect to an existing 1.0.0 cluster and see what what happens... Thanks, Matei :) On Tue, Aug 26, 2014 at 6:37 PM, Matei Zaharia matei.zaha...@gmail.com (mailto:matei.zaha...@gmail.com) wrote: Is this a standalone mode cluster? We don't currently make this guarantee, though it will likely work in 1.0.0 to 1.0.2. The problem though is that the standalone mode grabs the executors' version of Spark code from what's installed on the cluster, while your driver might be built against another version. On YARN and Mesos, you can more easily mix different versions of Spark, since each application ships its own Spark JAR (or references one from a URL), and this is used for both the driver and executors. Matei On August 26, 2014 at 6:10:57 PM, Victor Tso-Guillen (v...@paxata.com (mailto:v...@paxata.com)) wrote: I wanted to make sure that there's full compatibility between minor releases. I have a project that has a dependency on spark-core so that it can be a driver program and that I can test locally. However, when connecting to a cluster you don't necessarily know what version you're connecting to. Is a 1.0.0 cluster binary compatible with a 1.0.2 driver program? Is a 1.0.0 driver program binary compatible with a 1.0.2 cluster?
Execute HiveFormSpark ERROR.
hi, all : I tried to use Spark SQL on spark-shell, as the spark-example. When I execute : *val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) *then report error like below: scala hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) 14/08/27 11:08:19 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 14/08/27 11:08:19 INFO ParseDriver: Parse Completed 14/08/27 11:08:19 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/08/27 11:08:19 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/08/27 11:08:19 INFO Analyzer: Max iterations (2) reached for batch Check Analysis 14/08/27 11:08:19 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange 14/08/27 11:08:19 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions 14/08/27 11:08:19 INFO Driver: PERFLOG method=Driver.run 14/08/27 11:08:19 INFO Driver: PERFLOG method=TimeToSubmit 14/08/27 11:08:19 INFO Driver: PERFLOG method=compile 14/08/27 11:08:19 INFO Driver: PERFLOG method=parse 14/08/27 11:08:19 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 14/08/27 11:08:19 INFO ParseDriver: Parse Completed 14/08/27 11:08:19 INFO Driver: /PERFLOG method=parse start=1409108899822 end=1409108899822 duration=0 14/08/27 11:08:19 INFO Driver: PERFLOG method=semanticAnalyze 14/08/27 11:08:19 INFO SemanticAnalyzer: Starting Semantic Analysis 14/08/27 11:08:19 INFO SemanticAnalyzer: Creating table src position=27 14/08/27 11:08:19 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 14/08/27 11:08:19 INFO ObjectStore: ObjectStore, initialize called 14/08/27 11:08:20 WARN General: Plugin (Bundle) org.datanucleus is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/home/spark/spark/lib_managed/jars/datanucleus-core-3.2.2.jar is already registered, and you are trying to register an identical plugin located at URL file:/home/spark/spark-1.0.2-2.0.0-mr1-cdh-4.2.1/lib_managed/jars/datanucleus-core-3.2.2.jar. 14/08/27 11:08:20 WARN General: Plugin (Bundle) org.datanucleus.store.rdbms is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/home/spark/spark-1.0.2-2.0.0-mr1-cdh-4.2.1/lib_managed/jars/datanucleus-rdbms-3.2.1.jar is already registered, and you are trying to register an identical plugin located at URL file:/home/spark/spark/lib_managed/jars/datanucleus-rdbms-3.2.1.jar. 14/08/27 11:08:20 WARN General: Plugin (Bundle) org.datanucleus.api.jdo is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/home/spark/spark/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar is already registered, and you are trying to register an identical plugin located at URL file:/home/spark/spark-1.0.2-2.0.0-mr1-cdh-4.2.1/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar. 14/08/27 11:08:20 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table src at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:958) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:905) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:8999) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8313) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:284) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:441) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:342) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:977) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:189) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:163) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:250) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:104) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:75) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:78) at $line13.$read$$iwC$$iwC$$iwC$$iwC.init(console:16) at $line13.$read$$iwC$$iwC$$iwC.init(console:21) at
Re: Low Level Kafka Consumer for Spark
great work, Dibyendu. looks like this would be a popular contribution. expanding on bharat's question a bit: what happens if you submit multiple receivers to the cluster by creating and unioning multiple DStreams as in the kinesis example here: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L123 for more context, the kinesis implementation above uses the Kinesis Client Library (KCL) to automatically assign - and load balance - stream shards among all KCL threads from all receivers (potentially coming and going as nodes die) on all executors/nodes using DynamoDB as the association data store. ZooKeeper would be used for your Kafka consumers, of course. and ZooKeeper watches to handle the ephemeral nodes. and I see you're using Curator, which makes things easier. as bharat suggested, running multiple receivers/dstreams may be desirable from a scalability and fault tolerance standpoint. is this type of load balancing possible among your different Kafka consumers running in different ephemeral JVMs? and isn't it fun proposing a popular piece of code? the question floodgates have opened! haha. :) -chris On Tue, Aug 26, 2014 at 7:29 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Bharat, Thanks for your email. If the Kafka Reader worker process dies, it will be replaced by different machine, and it will start consuming from the offset where it left over ( for each partition). Same case can happen even if I tried to have individual Receiver for every partition. Regards, Dibyendu On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: I like this consumer for what it promises - better control over offset and recovery from failures. If I understand this right, it still uses single worker process to read from Kafka (one thread per partition) - is there a way to specify multiple worker processes (on different machines) to read from Kafka? Maybe one worker process for each partition? If there is no such option, what happens when the single machine hosting the Kafka Reader worker process dies and is replaced by a different machine (like in cloud)? Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming Output to DB
I would suggest you to use JDBC connector in mappartition instead of maps as JDBC connections are costly can really impact your performance. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Aug 26, 2014 at 6:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yes, you can open a jdbc connection at the beginning of the map method then close this connection at the end of map() and in between you can use this connection. Thanks Best Regards On Tue, Aug 26, 2014 at 6:12 PM, Ravi Sharma raviprincesha...@gmail.com wrote: Hello People, I'm using java spark streaming. I'm just wondering, Can I make simple jdbc connection in JavaDStream map() method? Or Do I need to create jdbc connection for each JavaPairDStream, after map task? Kindly give your thoughts. Cheers, Ravi Sharma
Re: What is a Block Manager?
We're a single-app deployment so we want to launch as many executors as the system has workers. We accomplish this by not configuring the max for the application. However, is there really no way to inspect what machines/executor ids/number of workers/etc is available in context? I'd imagine that there'd be something in the SparkContext or in the listener, but all I see in the listener is block managers getting added and removed. Wouldn't one care about the workers getting added and removed at least as much as for block managers? On Tue, Aug 26, 2014 at 6:58 PM, Liu, Raymond raymond@intel.com wrote: Basically, a Block Manager manages the storage for most of the data in spark, name a few: block that represent a cached RDD partition, intermediate shuffle data, broadcast data etc. it is per executor, while in standalone mode, normally, you have one executor per worker. You don't control how many worker you have at runtime, but you can somehow manage how many executors your application will launch Check different running mode's documentation for details ( but control where? Hardly, yarn mode did some works based on data locality, but this is done by framework not user program). Best Regards, Raymond Liu From: Victor Tso-Guillen [mailto:v...@paxata.com] Sent: Tuesday, August 26, 2014 11:42 PM To: user@spark.apache.org Subject: What is a Block Manager? I'm curious not only about what they do, but what their relationship is to the rest of the system. I find that I get listener events for n block managers added where n is also the number of workers I have available to the application. Is this a stable constant? Also, are there ways to determine at runtime how many workers I have and where they are? Thanks, Victor