spark disk-to-disk
i would like to use spark for some algorithms where i make no attempt to work in memory, so read from hdfs and write to hdfs for every step. of course i would like every step to only be evaluated once. and i have no need for spark's RDD lineage info, since i persist to reliable storage. the trouble is, i am not sure how to proceed. rdd.checkpoint() seems like the obvious candidate to force my computations to write to hdfs for intermediate data and cut the lineage, but rdd.checkpoint() does not actually trigger a job. rdd.checkpoint() runs after some other action triggered a job, leading to recomputation. the suggestion in the docs is to do: rdd.cache(); rdd.checkpoint() but that wont work for me since the data does not fit in memory. instead i could do: rdd.persist(StorageLevel.DISK_ONLY_2); rdd.checkpoint() but that leads to the data being written to disk twice in a row, which seems wasteful. so finally i can resort to: rdd.saveAsObjectFile(...) sc.objectFile(...) but that seems like a rather broken abstraction. any ideas? i feel like i am missing something obvious. or i am running yet again into spark's historical in-memory bias?
Re: converting DStream[String] into RDD[String] in spark streaming
Sean Dstream.saveAsTextFiles internally calls foreachRDD and saveAsTextFile for each interval def saveAsTextFiles(prefix: String, suffix: String = ) { val saveFunc = (rdd: RDD[T], time: Time) = { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } this.foreachRDD(saveFunc) } val sparkConf = new SparkConf().setAppName(TwitterRawJSON) val ssc = new StreamingContext(sparkConf, Seconds(30)) stream.saveAsTextFiles(hdfs://localhost:9000/twitterRawJSON) 1) if there are no sliding window calls in this streaming context, will there just one file written per interval? 2) if there is a sliding window call in the same context, such as val hashTags = stream.flatMap(json = DataObjectFactory.createStatus(json).getText.split( ).filter(_.startsWith(#))) val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(600)) .map{case (topic, count) = (count, topic)} .transform(_.sortByKey(false)) will the some files get written multiples time (as long as the interval is in the batch) Deenar DStream.foreachRDD gives you an RDD[String] for each interval of course. I don't think it makes sense to say a DStream can be converted into one RDD since it is a stream. The past elements are inherently not supposed to stick around for a long time, and future elements aren't known. You may consider saving each RDD[String] to HDFS, and then simply loading it from HDFS as an RDD[String]. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253p22175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark sql thrift server slower than hive
We have cloudera CDH 5.3 installed on one machine. We are trying to use spark sql thrift server to execute some analysis queries against hive table. Without any changes in the configurations, we run the following query on both hive and spark sql thrift server *select * from tableName;* The time taken by spark is larger than the time taken by hive which is not supposed to be the like that. The hive table is mapped to json files stored on HDFS directory and we are using *org.openx.data.jsonserde.JsonSerDe* for serialization/deserialization. Why spark takes much more time to execute the query than hive ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-thrift-server-slower-than-hive-tp22177.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: netlib-java cannot load native lib in Windows when using spark-submit
Hi Ted, I have tried to invoke the command from both cygwin environment and powershell environment. I still get the messages: 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS From the Spark UI, I can see: spark.driver.extraLibrary c:\openblas Thanks, David On Sun, Mar 22, 2015 at 11:45 AM Ted Yu yuzhih...@gmail.com wrote: Can you try the --driver-library-path option ? spark-submit --driver-library-path /opt/hadoop/lib/native ... Cheers On Sat, Mar 21, 2015 at 4:58 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I use the *OpenBLAS* DLL, and have configured my application to work in IDE. When I start my Spark application from IntelliJ IDE, I can see in the log that the native lib is loaded successfully. But if I use *spark-submit* to start my application, the native lib still cannot be load. I saw the WARN message that it failed to load both the native and native-ref library. I checked the *Environment* tab in the Spark UI, and the *java.library.path* is set correctly. Thanks, David
Re: How to set Spark executor memory?
OK, I actually got the answer days ago from StackOverflow, but I did not check it :( When running in local mode, to set the executor memory - when using spark-submit, use --driver-memory - when running as a Java application, like executing from IDE, set the -Xmx vm option Thanks, David On Sun, Mar 22, 2015 at 2:10 PM Ted Yu yuzhih...@gmail.com wrote: bq. the BLAS native cannot be loaded Have you tried specifying --driver-library-path option ? Cheers On Sat, Mar 21, 2015 at 4:42 PM, Xi Shen davidshe...@gmail.com wrote: Yeah, I think it is harder to troubleshot the properties issues in a IDE. But the reason I stick to IDE is because if I use spark-submit, the BLAS native cannot be loaded. May be I should open another thread to discuss that. Thanks, David On Sun, 22 Mar 2015 10:38 Xi Shen davidshe...@gmail.com wrote: In the log, I saw MemoryStorage: MemoryStore started with capacity 6.7GB But I still can not find where to set this storage capacity. On Sat, 21 Mar 2015 20:30 Xi Shen davidshe...@gmail.com wrote: Hi Sean, It's getting strange now. If I ran from IDE, my executor memory is always set to 6.7G, no matter what value I set in code. I have check my environment variable, and there's no value of 6.7, or 12.5 Any idea? Thanks, David On Tue, 17 Mar 2015 00:35 null jishnu.prat...@wipro.com wrote: Hi Xi Shen, You could set the spark.executor.memory in the code itself . new SparkConf()..set(spark.executor.memory, 2g) Or you can try the -- spark.executor.memory 2g while submitting the jar. Regards Jishnu Prathap *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Monday, March 16, 2015 2:06 PM *To:* Xi Shen *Cc:* user@spark.apache.org *Subject:* Re: How to set Spark executor memory? By default spark.executor.memory is set to 512m, I'm assuming since you are submiting the job using spark-submit and it is not able to override the value since you are running in local mode. Can you try it without using spark-submit as a standalone project? Thanks Best Regards On Mon, Mar 16, 2015 at 1:52 PM, Xi Shen davidshe...@gmail.com wrote: I set it in code, not by configuration. I submit my jar file to local. I am working in my developer environment. On Mon, 16 Mar 2015 18:28 Akhil Das ak...@sigmoidanalytics.com wrote: How are you setting it? and how are you submitting the job? Thanks Best Regards On Mon, Mar 16, 2015 at 12:52 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I have set spark.executor.memory to 2048m, and in the UI Environment page, I can see this value has been set correctly. But in the Executors page, I saw there's only 1 executor and its memory is 265.4MB. Very strange value. why not 256MB, or just as what I set? What am I missing here? Thanks, David The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
DataFrame saveAsTable - partitioned tables
Hi I wanted to store DataFrames as partitioned Hive tables. Is there a way to do this via the saveAsTable call. The set of options does not seem to be documented. def saveAsTable(tableName: String, source: String, mode: SaveMode, options: Map[String, String]): Unit (Scala-specific) Creates a table from the the contents of this DataFrame based on a given data source, SaveMode specified by mode, and a set of options. Optionally is there a way to just create external hive tables for data that is already present on HDFS. something similar to sc.sql(alter table results add partition (date = '2014');) Regards Deenar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-saveAsTable-partitioned-tables-tp22173.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Load balancing
Hi Mohit, please make sure you use the Reply to all button and include the mailing list, otherwise only I will get your message ;) Regarding your question: Yes, that's also my understanding. You can partition streaming RDDs only by time intervals, not by size. So depending on your incoming rate, they may vary. I do not know exactly what the life cycle of the receiver is, but I don't think sth actually happens when you create the DStream. My guess would be that the receiver is allocated when you call StreamingContext#startStreams(), Regards, Jeff 2015-03-21 21:19 GMT+01:00 Mohit Anchlia mohitanch...@gmail.com: Could somebody help me understand the question I posted earlier? On Fri, Mar 20, 2015 at 9:44 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Thanks for the pointer, looking at the below description from the site it looks like in spark block size is not fixed, it's determined by block interval and in fact for the same batch you could have different block sizes. Did I get it right? - Another parameter that should be considered is the receiver’s blocking interval, which is determined by the configuration parameter http://spark.apache.org/docs/1.2.1/configuration.html#spark-streaming spark.streaming.blockInterval. For most receivers, the received data is coalesced together into blocks of data before storing inside Spark’s memory. The number of blocks in each batch determines the number of tasks that will be used to process those the received data in a map-like transformation. The number of tasks per receiver per batch will be approximately (batch interval / block interval). For example, block interval of 200 ms will create 10 tasks per 2 second batches. Too low the number of tasks (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem. -- Also, I am not clear about the data flow of the receiver. When client gets handle to a spark context and calls something like val lines = ssc. socketTextStream(localhost, ), is this the point when spark master is contacted to determine which spark worker node the data is going to go to? On Fri, Mar 20, 2015 at 1:33 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi Mohit, it also depends on what the source for your streaming application is. If you use Kafka, you can easily partition topics and have multiple receivers on different machines. If you have sth like a HTTP, socket, etc stream, you probably can't do that. The Spark RDDs generated by your receiver will be partitioned and processed in a distributed manner like usual Spark RDDs however. There are parameters to control that behavior (e.g. defaultParallelism and blockInterval). See here for more details: http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#performance-tuning Regards, Jeff 2015-03-20 8:02 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: 1. If you are consuming data from Kafka or any other receiver based sources, then you can start 1-2 receivers per worker (assuming you'll have min 4 core per worker) 2. If you are having single receiver or is a fileStream then what you can do to distribute the data across machines is to do a repartition. Thanks Best Regards On Thu, Mar 19, 2015 at 11:32 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to understand how to load balance the incoming data to multiple spark streaming workers. Could somebody help me understand how I can distribute my incoming data from various sources such that incoming data is going to multiple spark streaming nodes? Is it done by spark client with help of spark master similar to hadoop client asking namenodes for the list of datanodes?
Should Spark SQL support retrieve column value from Row by column name?
I would like to retrieve column value from Spark SQL query result. But currently it seems that Spark SQL only support retrieving by index val results = sqlContext.sql(SELECT name FROM people) results.map(t = Name: + *t(0)*).collect().foreach(println) I think it will be much more convenient if I could do something like this: results.map(t = Name: + *t(name)*).collect().foreach(println) How do you like? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Should-Spark-SQL-support-retrieve-column-value-from-Row-by-column-name-tp22174.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Should Spark SQL support retrieve column value from Row by column name?
If you use the latest version Spark 1.3, you can use the DataFrame API like val results = sqlContext.sql(SELECT name FROM people) results.select(name).show() 2015-03-22 15:40 GMT+08:00 amghost zhengweita...@gmail.com: I would like to retrieve column value from Spark SQL query result. But currently it seems that Spark SQL only support retrieving by index val results = sqlContext.sql(SELECT name FROM people) results.map(t = Name: + *t(0)*).collect().foreach(println) I think it will be much more convenient if I could do something like this: results.map(t = Name: + *t(name)*).collect().foreach(println) How do you like? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Should-Spark-SQL-support-retrieve-column-value-from-Row-by-column-name-tp22174.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to do nested foreach with RDD
Hi Reza, Yes, I just found RDD.cartesian(). Very useful. Thanks, David On Sun, Mar 22, 2015 at 5:08 PM Reza Zadeh r...@databricks.com wrote: You can do this with the 'cartesian' product method on RDD. For example: val rdd1 = ... val rdd2 = ... val combinations = rdd1.cartesian(rdd2).filter{ case (a,b) = a b } Reza On Sat, Mar 21, 2015 at 10:37 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I have two big RDD, and I need to do some math against each pair of them. Traditionally, it is like a nested for-loop. But for RDD, it cause a nested RDD which is prohibited. Currently, I am collecting one of them, then do a nested for-loop, so to avoid nested RDD. But would like to know if there's spark-way to do this. Thanks, David
Re: converting DStream[String] into RDD[String] in spark streaming
On Sun, Mar 22, 2015 at 8:43 AM, deenar.toraskar deenar.toras...@db.com wrote: 1) if there are no sliding window calls in this streaming context, will there just one file written per interval? As many files as there are partitions will be written in each interval. 2) if there is a sliding window call in the same context, such as val hashTags = stream.flatMap(json = DataObjectFactory.createStatus(json).getText.split( ).filter(_.startsWith(#))) val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(600)) .map{case (topic, count) = (count, topic)} .transform(_.sortByKey(false)) will the some files get written multiples time (as long as the interval is in the batch) I don't think it's right to say files will be written many times, but yes it is my understanding that data will be written many times since a datum lies in many windows. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ArrayIndexOutOfBoundsException in ALS.trainImplicit
My bad. This was an outofmemory disguised as something else. Regards Sab On Sun, Mar 22, 2015 at 1:53 AM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: I am consistently running into this ArrayIndexOutOfBoundsException issue when using trainImplicit. I have tried changing the partitions and switching to JavaSerializer. But they don't seem to help. I see that this is the same as https://issues.apache.org/jira/browse/SPARK-3080. My lambda is 0.01, rank is 5, iterations is 10 and alpha is 0.01. I am using 41 executors, each with 8GB on a 48 million dataset. 15/03/21 13:07:29 ERROR executor.Executor: Exception in task 12.0 in stage 2808.0 (TID 40575) java.lang.ArrayIndexOutOfBoundsException: 692 at org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.scala:548) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.mllib.recommendation.ALS.org $apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:542) at org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:510) at org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:509) at org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31) at org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31) How can I get around this issue? Regards Sab -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++ -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++
How to use DataFrame with MySQL
OK, I have known that I could use jdbc connector to create DataFrame with this command: val jdbcDF = sqlContext.load(jdbc, Map(url - jdbc:mysql://localhost:3306/video_rcmd?user=rootpassword=123456, dbtable - video)) But I got this error: java.sql.SQLException: No suitable driver found for ... And I have tried to add jdbc jar to spark_path with both commands but failed: - spark-shell --jars mysql-connector-java-5.0.8-bin.jar - SPARK_CLASSPATH=mysql-connector-java-5.0.8-bin.jar spark-shell My Spark version is 1.3.0 while `Class.forName(com.mysql.jdbc.Driver).newInstance` is worked. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-DataFrame-with-MySQL-tp22178.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
lowerupperBound not working/spark 1.3
Hi All - I try to use the new SQLContext API for populating DataFrame from jdbc data source. like this: val jdbcDF = sqlContext.jdbc(url = jdbc:postgresql://localhost:5430/dbname?user=userpassword=111, table = se_staging.exp_table3 ,columnName=cs_id,lowerBound=1 ,upperBound = 1, numPartitions=12 ) No matter how I set lower and upper bounds I always get all the rows from my table. The API is marked as experimental so I assume there might by some bugs in it but did anybody come across a similar issue? Thanks!
Re: How Does aggregate work
I assume spark.default.parallelism is 4 in the VM Ashish was using. Cheers
How Does aggregate work
Hi , I am not able to understand how aggregate function works, Can some one please explain how below result came I am running spark using cloudera VM The result in below is 17 but i am not able to find out how it is calculating 17 val data = sc.parallelize(List(2,3,4)) data.aggregate(0)((x,y) = x+y,(x,y) = 2+x+y) *res21: Int = 17* Also when i try to change the 2nd parameter in sc.parallelize i get different result val data = sc.parallelize(List(2,3,4),2) data.aggregate(0)((x,y) = x+y,(x,y) = 2+x+y) *res21: Int = 13* Thanks for the help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-Does-aggregate-work-tp22179.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error while installing Spark 1.3.0 on local machine
Any particular reason you're not just downloading a build from http://spark.apache.org/downloads.html Even if you aren't using Hadoop, any of those builds will work. If you want to build from source, the Maven build is more reliable. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sat, Mar 21, 2015 at 5:52 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Hello, I am trying to install Spark 1.3.0 on my mac. Earlier, I was working with Spark 1.1.0. Now, I come across this error : sbt.ResolveException: unresolved dependency: org.apache.spark#spark-network-common_2.10;1.3.0: configuration not public in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It was required from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:278) at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:175) at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:157) at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151) at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151) at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:128) at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:56) at sbt.IvySbt$$anon$4.call(Ivy.scala:64) at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93) at xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:78) at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:97) at xsbt.boot.Using$.withResource(Using.scala:10) at xsbt.boot.Using$.apply(Using.scala:9) at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58) at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48) at xsbt.boot.Locks$.apply0(Locks.scala:31) at xsbt.boot.Locks$.apply(Locks.scala:28) at sbt.IvySbt.withDefaultLogger(Ivy.scala:64) at sbt.IvySbt.withIvy(Ivy.scala:123) at sbt.IvySbt.withIvy(Ivy.scala:120) at sbt.IvySbt$Module.withModule(Ivy.scala:151) at sbt.IvyActions$.updateEither(IvyActions.scala:157) at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1318) at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1315) at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1345) at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1343) at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:35) at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1348) at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1342) at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:45) at sbt.Classpaths$.cachedUpdate(Defaults.scala:1360) at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1300) at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1275) at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40) at sbt.std.Transform$$anon$4.work(System.scala:63) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226) at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17) at sbt.Execute.work(Execute.scala:235) at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226) at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226) at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159) at sbt.CompletionService$$anon$2.call(CompletionService.scala:28) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) [error] (network-shuffle/*:update) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-network-common_2.10;1.3.0: configuration not public in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It was required from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test [error] Total time: 5 s, completed Mar 21, 2015 7:48:45 PM I tried uninstalling and re - installing, when I browsed over the internet, I came across suggestions to include -Phadoop, now even if I use build/sbt -Pyarn -Phadoop-2.3 assembly It gives me an error. I greatly appreciate any help. Thank you for your time. -- Regards, Haripriya Ayyalasomayajula Graduate Student Department of Computer Science University of Houston Contact :
How to check that a dataset is sorted after it has been written out? [repost]
Greetings![My apologies for this respost, I'm not certain that the first message made it to the list]. I sorted a dataset in Spark and then wrote it out in avro/parquet. Then I wanted to check that it was sorted. It looks like each partition has been sorted, but when reading in, the first partition (i.e., as seen in the partition index of mapPartitionsWithIndex) is not the same as implied by the names of the parquet files (even when the number of partitions is the same in therdd which was read as on disk). If I take() a few hundred values, they are sorted, but they are *not* the same as if I explicitly open part-r-0.parquet and take values from that. It seems that when opening the rdd, the partitions of the rdd are not in the sameorder as implied by the data on disk (i.e., part-r-0.parquet, part-r-1.parquet, etc). So, how might one read the data so that one maintains the sort order? And while on the subject, after the terasort, how did they check that the data was actually sorted correctly? (or did they :-) ? ). Is there any way to read the data back in so as to preserve the sort, or do I need to zipWithIndex before writing it out, and write the index at that time? (I haven't tried the latter yet). Thanks!-Mike
Re: can distinct transform applied on DStream?
aDstream.transform(_.distinct()) will only make the elements of each RDD in the DStream distinct, not for the whole DStream globally. Is that what you're seeing? Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Mar 20, 2015 at 10:37 AM, Darren Hoo darren@gmail.com wrote: val aDstream = ... val distinctStream = aDstream.transform(_.distinct()) but the elements in distinctStream are not distinct. Did I use it wrong?
Re: netlib-java cannot load native lib in Windows when using spark-submit
How about pointing LD_LIBRARY_PATH to native lib folder ? You need Spark 1.2.0 or higher for the above to work. See SPARK-1719 Cheers On Sun, Mar 22, 2015 at 4:02 AM, Xi Shen davidshe...@gmail.com wrote: Hi Ted, I have tried to invoke the command from both cygwin environment and powershell environment. I still get the messages: 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS From the Spark UI, I can see: spark.driver.extraLibrary c:\openblas Thanks, David On Sun, Mar 22, 2015 at 11:45 AM Ted Yu yuzhih...@gmail.com wrote: Can you try the --driver-library-path option ? spark-submit --driver-library-path /opt/hadoop/lib/native ... Cheers On Sat, Mar 21, 2015 at 4:58 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I use the *OpenBLAS* DLL, and have configured my application to work in IDE. When I start my Spark application from IntelliJ IDE, I can see in the log that the native lib is loaded successfully. But if I use *spark-submit* to start my application, the native lib still cannot be load. I saw the WARN message that it failed to load both the native and native-ref library. I checked the *Environment* tab in the Spark UI, and the *java.library.path* is set correctly. Thanks, David
Re: How Does aggregate work
2 is added every time the final partition aggregator is called. The result of summing the elements across partitions is 9 of course. If you force a single partition (using spark-shell in local mode): scala val data = sc.parallelize(List(2,3,4),1) scala data.aggregate(0)((x,y) = x+y,(x,y) = 2+x+y) res0: Int = 11 The 2nd function is still called, even though there is only one partition (presumably either x or y is set to 0). For every additional partition you specify as the 2nd arg. to parallelize, the 2nd function will be called again: scala val data = sc.parallelize(List(2,3,4),1) scala data.aggregate(0)((x,y) = x+y,(x,y) = 2+x+y) res0: Int = 11 scala val data = sc.parallelize(List(2,3,4),2) scala data.aggregate(0)((x,y) = x+y,(x,y) = 2+x+y) res0: Int = 13 scala val data = sc.parallelize(List(2,3,4),3) scala data.aggregate(0)((x,y) = x+y,(x,y) = 2+x+y) res0: Int = 15 scala val data = sc.parallelize(List(2,3,4),4) scala data.aggregate(0)((x,y) = x+y,(x,y) = 2+x+y) res0: Int = 17 Hence, it appears that not specifying the 2nd argument resulted in 4 partitions, even though you only had three elements in the list. If p_i is the ith partition, the final sum appears to be: (2 + ... (2 + (2 + (2 + 0 + p_1) + p_2) + p_3) ...) Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, Mar 22, 2015 at 8:05 AM, ashish.usoni ashish.us...@gmail.com wrote: Hi , I am not able to understand how aggregate function works, Can some one please explain how below result came I am running spark using cloudera VM The result in below is 17 but i am not able to find out how it is calculating 17 val data = sc.parallelize(List(2,3,4)) data.aggregate(0)((x,y) = x+y,(x,y) = 2+x+y) *res21: Int = 17* Also when i try to change the 2nd parameter in sc.parallelize i get different result val data = sc.parallelize(List(2,3,4),2) data.aggregate(0)((x,y) = x+y,(x,y) = 2+x+y) *res21: Int = 13* Thanks for the help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-Does-aggregate-work-tp22179.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Load balancing
posting my question again :) Thanks for the pointer, looking at the below description from the site it looks like in spark block size is not fixed, it's determined by block interval and in fact for the same batch you could have different block sizes. Did I get it right? - Another parameter that should be considered is the receiver’s blocking interval, which is determined by the configuration parameter http://spark.apache.org/docs/1.2.1/configuration.html#spark-streaming spark.streaming.blockInterval. For most receivers, the received data is coalesced together into blocks of data before storing inside Spark’s memory. The number of blocks in each batch determines the number of tasks that will be used to process those the received data in a map-like transformation. The number of tasks per receiver per batch will be approximately (batch interval / block interval). For example, block interval of 200 ms will create 10 tasks per 2 second batches. Too low the number of tasks (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem. -- Also, I am not clear about the data flow of the receiver. When client gets handle to a spark context and calls something like val lines = ssc.socketTextStream(localhost, ), is this the point when spark master is contacted to determine which spark worker node the data is going to go to? On Sun, Mar 22, 2015 at 2:10 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi Mohit, please make sure you use the Reply to all button and include the mailing list, otherwise only I will get your message ;) Regarding your question: Yes, that's also my understanding. You can partition streaming RDDs only by time intervals, not by size. So depending on your incoming rate, they may vary. I do not know exactly what the life cycle of the receiver is, but I don't think sth actually happens when you create the DStream. My guess would be that the receiver is allocated when you call StreamingContext#startStreams(), Regards, Jeff 2015-03-21 21:19 GMT+01:00 Mohit Anchlia mohitanch...@gmail.com: Could somebody help me understand the question I posted earlier? On Fri, Mar 20, 2015 at 9:44 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Thanks for the pointer, looking at the below description from the site it looks like in spark block size is not fixed, it's determined by block interval and in fact for the same batch you could have different block sizes. Did I get it right? - Another parameter that should be considered is the receiver’s blocking interval, which is determined by the configuration parameter http://spark.apache.org/docs/1.2.1/configuration.html#spark-streaming spark.streaming.blockInterval. For most receivers, the received data is coalesced together into blocks of data before storing inside Spark’s memory. The number of blocks in each batch determines the number of tasks that will be used to process those the received data in a map-like transformation. The number of tasks per receiver per batch will be approximately (batch interval / block interval). For example, block interval of 200 ms will create 10 tasks per 2 second batches. Too low the number of tasks (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem. -- Also, I am not clear about the data flow of the receiver. When client gets handle to a spark context and calls something like val lines = ssc .socketTextStream(localhost, ), is this the point when spark master is contacted to determine which spark worker node the data is going to go to? On Fri, Mar 20, 2015 at 1:33 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi Mohit, it also depends on what the source for your streaming application is. If you use Kafka, you can easily partition topics and have multiple receivers on different machines. If you have sth like a HTTP, socket, etc stream, you probably can't do that. The Spark RDDs generated by your receiver will be partitioned and processed in a distributed manner like usual Spark RDDs however. There are parameters to control that behavior (e.g. defaultParallelism and blockInterval). See here for more details: http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#performance-tuning Regards, Jeff 2015-03-20 8:02 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: 1. If you are consuming
Re: lowerupperBound not working/spark 1.3
From javadoc of JDBCRelation#columnPartition(): * Given a partitioning schematic (a column of integral type, a number of * partitions, and upper and lower bounds on the column's value), generate In your example, 1 and 1 are for the value of cs_id column. Looks like all the values in that column fall within the range of 1 and 1000. Cheers On Sun, Mar 22, 2015 at 8:44 AM, Marek Wiewiorka marek.wiewio...@gmail.com wrote: Hi All - I try to use the new SQLContext API for populating DataFrame from jdbc data source. like this: val jdbcDF = sqlContext.jdbc(url = jdbc:postgresql://localhost:5430/dbname?user=userpassword=111, table = se_staging.exp_table3 ,columnName=cs_id,lowerBound=1 ,upperBound = 1, numPartitions=12 ) No matter how I set lower and upper bounds I always get all the rows from my table. The API is marked as experimental so I assume there might by some bugs in it but did anybody come across a similar issue? Thanks!
Re: Should Spark SQL support retrieve column value from Row by column name?
Please open a JIRA, we added the info to Row that will allow this to happen, but we need to provide the methods you are asking for. I'll add that this does work today in python (i.e. row.columnName). On Sun, Mar 22, 2015 at 12:40 AM, amghost zhengweita...@gmail.com wrote: I would like to retrieve column value from Spark SQL query result. But currently it seems that Spark SQL only support retrieving by index val results = sqlContext.sql(SELECT name FROM people) results.map(t = Name: + *t(0)*).collect().foreach(println) I think it will be much more convenient if I could do something like this: results.map(t = Name: + *t(name)*).collect().foreach(println) How do you like? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Should-Spark-SQL-support-retrieve-column-value-from-Row-by-column-name-tp22174.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: lowerupperBound not working/spark 1.3
...I even tried setting upper/lower bounds to the same value like 1 or 10 with the same result. cs_id is a column of the cardinality ~5*10^6 So this is not the case here. Regards, Marek 2015-03-22 20:30 GMT+01:00 Ted Yu yuzhih...@gmail.com: From javadoc of JDBCRelation#columnPartition(): * Given a partitioning schematic (a column of integral type, a number of * partitions, and upper and lower bounds on the column's value), generate In your example, 1 and 1 are for the value of cs_id column. Looks like all the values in that column fall within the range of 1 and 1000. Cheers On Sun, Mar 22, 2015 at 8:44 AM, Marek Wiewiorka marek.wiewio...@gmail.com wrote: Hi All - I try to use the new SQLContext API for populating DataFrame from jdbc data source. like this: val jdbcDF = sqlContext.jdbc(url = jdbc:postgresql://localhost:5430/dbname?user=userpassword=111, table = se_staging.exp_table3 ,columnName=cs_id,lowerBound=1 ,upperBound = 1, numPartitions=12 ) No matter how I set lower and upper bounds I always get all the rows from my table. The API is marked as experimental so I assume there might by some bugs in it but did anybody come across a similar issue? Thanks!
Re: lowerupperBound not working/spark 1.3
I went over JDBCRelation#columnPartition() but didn't find obvious clue (you can add more logging to confirm that the partitions were generated correctly). Looks like the issue may be somewhere else. Cheers On Sun, Mar 22, 2015 at 12:47 PM, Marek Wiewiorka marek.wiewio...@gmail.com wrote: ...I even tried setting upper/lower bounds to the same value like 1 or 10 with the same result. cs_id is a column of the cardinality ~5*10^6 So this is not the case here. Regards, Marek 2015-03-22 20:30 GMT+01:00 Ted Yu yuzhih...@gmail.com: From javadoc of JDBCRelation#columnPartition(): * Given a partitioning schematic (a column of integral type, a number of * partitions, and upper and lower bounds on the column's value), generate In your example, 1 and 1 are for the value of cs_id column. Looks like all the values in that column fall within the range of 1 and 1000. Cheers On Sun, Mar 22, 2015 at 8:44 AM, Marek Wiewiorka marek.wiewio...@gmail.com wrote: Hi All - I try to use the new SQLContext API for populating DataFrame from jdbc data source. like this: val jdbcDF = sqlContext.jdbc(url = jdbc:postgresql://localhost:5430/dbname?user=userpassword=111, table = se_staging.exp_table3 ,columnName=cs_id,lowerBound=1 ,upperBound = 1, numPartitions=12 ) No matter how I set lower and upper bounds I always get all the rows from my table. The API is marked as experimental so I assume there might by some bugs in it but did anybody come across a similar issue? Thanks!
Re: netlib-java cannot load native lib in Windows when using spark-submit
Did you build Spark with: -Pnetlib-lgpl? Ref: https://spark.apache.org/docs/latest/mllib-guide.html Burak On Sun, Mar 22, 2015 at 7:37 AM, Ted Yu yuzhih...@gmail.com wrote: How about pointing LD_LIBRARY_PATH to native lib folder ? You need Spark 1.2.0 or higher for the above to work. See SPARK-1719 Cheers On Sun, Mar 22, 2015 at 4:02 AM, Xi Shen davidshe...@gmail.com wrote: Hi Ted, I have tried to invoke the command from both cygwin environment and powershell environment. I still get the messages: 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS From the Spark UI, I can see: spark.driver.extraLibrary c:\openblas Thanks, David On Sun, Mar 22, 2015 at 11:45 AM Ted Yu yuzhih...@gmail.com wrote: Can you try the --driver-library-path option ? spark-submit --driver-library-path /opt/hadoop/lib/native ... Cheers On Sat, Mar 21, 2015 at 4:58 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I use the *OpenBLAS* DLL, and have configured my application to work in IDE. When I start my Spark application from IntelliJ IDE, I can see in the log that the native lib is loaded successfully. But if I use *spark-submit* to start my application, the native lib still cannot be load. I saw the WARN message that it failed to load both the native and native-ref library. I checked the *Environment* tab in the Spark UI, and the *java.library.path* is set correctly. Thanks, David
Re: DataFrame saveAsTable - partitioned tables
Note you can use HiveQL syntax for creating dynamically partitioned tables though. On Sun, Mar 22, 2015 at 1:29 PM, Michael Armbrust mich...@databricks.com wrote: Not yet. This is on the roadmap for Spark 1.4. On Sun, Mar 22, 2015 at 12:19 AM, deenar.toraskar deenar.toras...@db.com wrote: Hi I wanted to store DataFrames as partitioned Hive tables. Is there a way to do this via the saveAsTable call. The set of options does not seem to be documented. def saveAsTable(tableName: String, source: String, mode: SaveMode, options: Map[String, String]): Unit (Scala-specific) Creates a table from the the contents of this DataFrame based on a given data source, SaveMode specified by mode, and a set of options. Optionally is there a way to just create external hive tables for data that is already present on HDFS. something similar to sc.sql(alter table results add partition (date = '2014');) Regards Deenar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-saveAsTable-partitioned-tables-tp22173.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame saveAsTable - partitioned tables
Not yet. This is on the roadmap for Spark 1.4. On Sun, Mar 22, 2015 at 12:19 AM, deenar.toraskar deenar.toras...@db.com wrote: Hi I wanted to store DataFrames as partitioned Hive tables. Is there a way to do this via the saveAsTable call. The set of options does not seem to be documented. def saveAsTable(tableName: String, source: String, mode: SaveMode, options: Map[String, String]): Unit (Scala-specific) Creates a table from the the contents of this DataFrame based on a given data source, SaveMode specified by mode, and a set of options. Optionally is there a way to just create external hive tables for data that is already present on HDFS. something similar to sc.sql(alter table results add partition (date = '2014');) Regards Deenar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-saveAsTable-partitioned-tables-tp22173.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: join two DataFrames, same column name
You can include * and a column alias in the same select clause var df1 = sqlContext.sql(select *, column_id AS table1_id from table1) I'm also hoping to resolve SPARK-6376 https://issues.apache.org/jira/browse/SPARK-6376 before Spark 1.3.1 which will let you do something like: var df1 = sqlContext.sql(select * from table1).as(t1) var df2 = sqlContext.sql(select * from table2).as(t2) df1.join(df2, df1(column_id) === df2(column_id)).select(t1.column_id) Finally, there is SPARK-6380 https://issues.apache.org/jira/browse/SPARK-6380 that hopes to simplify this particular case. Michael On Sat, Mar 21, 2015 at 3:02 PM, Eric Friedman eric.d.fried...@gmail.com wrote: I have a couple of data frames that I pulled from SparkSQL and the primary key of one is a foreign key of the same name in the other. I'd rather not have to specify each column in the SELECT statement just so that I can rename this single column. When I try to join the data frames, I get an exception because it finds the two columns of the same name to be ambiguous. Is there a way to specify which side of the join comes from data frame A and which comes from B? var df1 = sqlContext.sql(select * from table1) var df2 = sqlContext.sql(select * from table2) df1.join(df2, df1(column_id) === df2(column_id))
Re: 'nested' RDD problem, advise needed
Something like this? (2 to alphabetLength toList).map(shift = Object.myFunction(inputRDD, shift).map(v = shift - v).foldLeft(Object.myFunction(inputRDD, 1).map(v = 1 - v))(_ union _) which is an RDD[(Int, Char)] Problem is that you can't play with RDDs inside of RDDs. The recursive structure breaks the Spark programming model. On Sat, Mar 21, 2015 at 10:26 AM, Michael Lewis lewi...@me.com wrote: Hi, I wonder if someone can help suggest a solution to my problem, I had a simple process working using Strings and now want to convert to RDD[Char], the problem is when I end up with a nested call as follow: 1) Load a text file into an RDD[Char] val inputRDD = sc.textFile(“myFile.txt”).flatMap(_.toIterator) 2) I have a method that takes two parameters: object Foo { def myFunction(inputRDD: RDD[Char], int val) : RDD[Char] ... 3) I have a method that the driver process calls once its loaded the inputRDD ‘bar’ as follows: def bar(inputRDD: Rdd[Char) : Int = { val solutionSet = sc.parallelize(1 to alphabetLength toList).map(shift = (shift, Object.myFunction(inputRDD,shift))) What I’m trying to do is take a list 1..26 and generate a set of tuples { (1,RDD(1)), …. (26,RDD(26)) } which is the inputRDD passed through the function above, but with different set of shift parameters. In my original I could parallelise the algorithm fine, but my input string had to be in a ‘String’ variable, I’d rather it be an RDD (string could be large). I think the way I’m trying to do it above won’t work because its a nested RDD call. Can anybody suggest a solution? Regards, Mike Lewis - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Convert Spark SQL table to RDD in Scala / error: value toFloat is a not a member of Any
I'm following some online tutorial written in Python and trying to convert a Spark SQL table object to an RDD in Scala. The Spark SQL just loads a simple table from a CSV file. The tutorial says to convert the table to an RDD. The Python is products_rdd = sqlContext.table(products).map(lambda row: (float(row[0]),float(row[1]),float(row[2]),float(row[3]), float(row[4]),float(row[5]),float(row[6]),float(row[7]),float(row[8]),float(row[9]),float(row[10]),float(row[11]))) The Scala is *not* val productsRdd = sqlContext.table(products).map( row = ( row(0).toFloat,row(1).toFloat,row(2).toFloat,row(3).toFloat, row(4).toFloat,row(5).toFloat,row(6).toFloat,row(7).toFloat,row(8).toFloat, row(9).toFloat,row(10).toFloat,row(11).toFloat )) I know this, because Spark says that for each of the row(x).toFloat calls, error: value toFloat is not a member of Any Does anyone know the proper syntax for this? Thank you
Re: How to do nested foreach with RDD
You can do this with the 'cartesian' product method on RDD. For example: val rdd1 = ... val rdd2 = ... val combinations = rdd1.cartesian(rdd2).filter{ case (a,b) = a b } Reza On Sat, Mar 21, 2015 at 10:37 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I have two big RDD, and I need to do some math against each pair of them. Traditionally, it is like a nested for-loop. But for RDD, it cause a nested RDD which is prohibited. Currently, I am collecting one of them, then do a nested for-loop, so to avoid nested RDD. But would like to know if there's spark-way to do this. Thanks, David
Re: can distinct transform applied on DStream?
What do you mean not distinct? It does works for me: [image: Inline image 1] Code: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkContext, SparkConf} val ssc = new StreamingContext(sc, Seconds(1)) val data = ssc.textFileStream(/home/akhld/mobi/localcluster/spark-1/sigmoid/) val dist = data.transform(_.distinct()) dist.print() ssc.start() ssc.awaitTermination() Thanks Best Regards On Fri, Mar 20, 2015 at 11:07 PM, Darren Hoo darren@gmail.com wrote: val aDstream = ... val distinctStream = aDstream.transform(_.distinct()) but the elements in distinctStream are not distinct. Did I use it wrong?
Re: Convert Spark SQL table to RDD in Scala / error: value toFloat is a not a member of Any
You need either |.map { row = (row(0).asInstanceOf[Float], row(1).asInstanceOf[Float], ...) } | or |.map {case Row(f0:Float, f1:Float, ...) = (f0, f1) } | On 3/23/15 9:08 AM, Minnow Noir wrote: I'm following some online tutorial written in Python and trying to convert a Spark SQL table object to an RDD in Scala. The Spark SQL just loads a simple table from a CSV file. The tutorial says to convert the table to an RDD. The Python is products_rdd = sqlContext.table(products).map(lambda row: (float(row[0]),float(row[1]),float(row[2]),float(row[3]), float(row[4]),float(row[5]),float(row[6]),float(row[7]),float(row[8]),float(row[9]),float(row[10]),float(row[11]))) The Scala is *not* val productsRdd = sqlContext.table(products).map( row = ( row(0).toFloat,row(1).toFloat,row(2).toFloat,row(3).toFloat, row(4).toFloat,row(5).toFloat,row(6).toFloat,row(7).toFloat,row(8).toFloat, row(9).toFloat,row(10).toFloat,row(11).toFloat )) I know this, because Spark says that for each of the row(x).toFloat calls, error: value toFloat is not a member of Any Does anyone know the proper syntax for this? Thank you
Re: Convert Spark SQL table to RDD in Scala / error: value toFloat is a not a member of Any
I thought of formation #1. But looks like when there're many fields, formation #2 is cleaner. Cheers On Sun, Mar 22, 2015 at 8:14 PM, Cheng Lian lian.cs@gmail.com wrote: You need either .map { row = (row(0).asInstanceOf[Float], row(1).asInstanceOf[Float], ...) } or .map { case Row(f0: Float, f1: Float, ...) = (f0, f1) } On 3/23/15 9:08 AM, Minnow Noir wrote: I'm following some online tutorial written in Python and trying to convert a Spark SQL table object to an RDD in Scala. The Spark SQL just loads a simple table from a CSV file. The tutorial says to convert the table to an RDD. The Python is products_rdd = sqlContext.table(products).map(lambda row: (float(row[0]),float(row[1]),float(row[2]),float(row[3]), float(row[4]),float(row[5]),float(row[6]),float(row[7]),float(row[8]),float(row[9]),float(row[10]),float(row[11]))) The Scala is *not* val productsRdd = sqlContext.table(products).map( row = ( row(0).toFloat,row(1).toFloat,row(2).toFloat,row(3).toFloat, row(4).toFloat,row(5).toFloat,row(6).toFloat,row(7).toFloat,row(8).toFloat, row(9).toFloat,row(10).toFloat,row(11).toFloat )) I know this, because Spark says that for each of the row(x).toFloat calls, error: value toFloat is not a member of Any Does anyone know the proper syntax for this? Thank you
SocketTimeout only when launching lots of executors
Hi, spark users. When running a spark application with lots of executors(300+), I see following failures: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:690) at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1324) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:583) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:356) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:353) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:353) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) When I reduce the number of executors, the spark app runs fine. From the stack trace, it looks like that multiple executors requesting downloading dependencies at the same time is causing driver to timeout? Anyone experienced similar issues or has any suggestions? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark sql thrift server slower than hive
How are you running your spark instance out of curiosity? Via YARN or standalone mode? When connecting Spark thriftserver to the Spark service, have you allocated enough memory and CPU when executing with spark? On Sun, Mar 22, 2015 at 3:39 AM fanooos dev.fano...@gmail.com wrote: We have cloudera CDH 5.3 installed on one machine. We are trying to use spark sql thrift server to execute some analysis queries against hive table. Without any changes in the configurations, we run the following query on both hive and spark sql thrift server *select * from tableName;* The time taken by spark is larger than the time taken by hive which is not supposed to be the like that. The hive table is mapped to json files stored on HDFS directory and we are using *org.openx.data.jsonserde.JsonSerDe* for serialization/deserialization. Why spark takes much more time to execute the query than hive ? -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Spark-sql-thrift-server-slower-than- hive-tp22177.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark disk-to-disk
On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers ko...@tresata.com wrote: so finally i can resort to: rdd.saveAsObjectFile(...) sc.objectFile(...) but that seems like a rather broken abstraction. This seems like a fine solution to me.