Re: Is sorting persisted after pair rdd transformations?
If something is persisted you can easily see them under the Storage tab in the web ui. Thanks Best Regards On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?
Merging Parquet Files
Hello, I'm writing a process that ingests json files and saves them a parquet files. The process is as such: val sqlContext = new org.apache.spark.sql.SQLContext(sc) val jsonRequests=sqlContext.jsonFile(/requests) val parquetRequests=sqlContext.parquetFile(/requests_parquet) jsonRequests.registerTempTable(jsonRequests) parquetRequests.registerTempTable(parquetRequests) val unified_requests=sqlContext.sql(select * from jsonRequests union select * from parquetRequests) unified_requests.saveAsParquetFile(/tempdir) and then I delete /requests_parquet and rename /tempdir as /requests_parquet Is there a better way to achieve that ? Another problem I have is that I get a lot of small json files and as a result a lot of small parquet files, I'd like to merge the json files into a few parquet files.. how I do that? Thank you, Daniel
Re: Merging Parquet Files
You can also insert into existing tables via .insertInto(tableName, overwrite). You just have to import sqlContext._ On 19.11.2014, at 09:41, Daniel Haviv danielru...@gmail.com wrote: Hello, I'm writing a process that ingests json files and saves them a parquet files. The process is as such: val sqlContext = new org.apache.spark.sql.SQLContext(sc) val jsonRequests=sqlContext.jsonFile(/requests) val parquetRequests=sqlContext.parquetFile(/requests_parquet) jsonRequests.registerTempTable(jsonRequests) parquetRequests.registerTempTable(parquetRequests) val unified_requests=sqlContext.sql(select * from jsonRequests union select * from parquetRequests) unified_requests.saveAsParquetFile(/tempdir) and then I delete /requests_parquet and rename /tempdir as /requests_parquet Is there a better way to achieve that ? Another problem I have is that I get a lot of small json files and as a result a lot of small parquet files, I'd like to merge the json files into a few parquet files.. how I do that? Thank you, Daniel
Re: Merging Parquet Files
Very cool thank you! On Wed, Nov 19, 2014 at 11:15 AM, Marius Soutier mps@gmail.com wrote: You can also insert into existing tables via .insertInto(tableName, overwrite). You just have to import sqlContext._ On 19.11.2014, at 09:41, Daniel Haviv danielru...@gmail.com wrote: Hello, I'm writing a process that ingests json files and saves them a parquet files. The process is as such: val sqlContext = new org.apache.spark.sql.SQLContext(sc) val jsonRequests=sqlContext.jsonFile(/requests) val parquetRequests=sqlContext.parquetFile(/requests_parquet) jsonRequests.registerTempTable(jsonRequests) parquetRequests.registerTempTable(parquetRequests) val unified_requests=sqlContext.sql(select * from jsonRequests union select * from parquetRequests) unified_requests.saveAsParquetFile(/tempdir) and then I delete /requests_parquet and rename /tempdir as /requests_parquet Is there a better way to achieve that ? Another problem I have is that I get a lot of small json files and as a result a lot of small parquet files, I'd like to merge the json files into a few parquet files.. how I do that? Thank you, Daniel
RE: Spark to eliminate full-table scan latency
You can serve queries over your RDD data yes, and return results to the user/client as long as your driver is alive. For example, I have built a play! application that acts as a driver (creating a spark context), loads up data from my database, organize it and subsequently receive and process user queries over http. As long as my play! application is running, my spark application is kept alive within the cluster. You can also have a look at this from ooyala: https://github.com/ooyala/spark-jobserver -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-to-eliminate-full-table-scan-latency-tp17395p19261.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why is ALS class serializable ?
Hi, When reading through ALS code, I find that: class ALS private ( private var numUserBlocks: Int, private var numProductBlocks: Int, private var rank: Int, private var iterations: Int, private var lambda: Double, private var implicitPrefs: Boolean, private var alpha: Double, private var seed: Long = System.nanoTime() ) extends *Serializable *with Logging and why should ALS extend Serializable ? if not, there will be an Exception: task is not serializable, ALS is not serializable. I did not find any closure functions in which ALS is referenced. Any idea ? Thx. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-ALS-class-serializable-tp19262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Understanding spark operation pipeline and block storage
Anyone has idea on this ? Thx -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201p19263.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Efficient way to split an input data set into different output files
I'm trying to set up a PySpark ETL job that takes in JSON log files and spits out fact table files for upload to Redshift. Is there an efficient way to send different event types to different outputs without having to just read the same cached RDD twice? I have my first RDD which is just a json parsed version of the input data, and I need to create a flattened page views dataset off this based on eventType = 'INITIAL', and then a page events dataset from the same RDD based on eventType = 'ADDITIONAL'. Ideally I'd like the output files for both these tables to be written at the same time, so I'm picturing a function with one input RDD in and two RDDs out, or a function utilising two CSV writers. I'm using mapPartitions at the moment to write to files like this: def write_records(records): output = StringIO.StringIO() writer = vlad.CsvUnicodeWriter(output, dialect='excel') for record in records: writer.writerow(record) return [output.getvalue()] and I use this in the call to write the file as follows (pageviews and events get created off the same json parsed RDD by filtering on INITIAL or ADDITIONAL respectively): pageviews.mapPartitions(writeRecords).saveAsTextFile('s3n://output/pageviews/') events.mapPartitions(writeRecords).saveAsTextFile(''s3n://output/events/) Is there a way to change this so that both are written in the same process?
Re: Is sorting persisted after pair rdd transformations?
Akhil, I think Aniket uses the word persisted in a different way than what you mean. I.e. not in the RDD.persist() way. Aniket asks if running combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting is preserved.) I think the answer is no. combineByKey uses AppendOnlyMap, which is a hashmap. That will shuffle your keys. You can quickly verify it in spark-shell: scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect res0: Array[(Int, Int)] = Array((8,1), (7,1)) (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first number that demonstrates this.) On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: If something is persisted you can easily see them under the Storage tab in the web ui. Thanks Best Regards On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?
Re: Is sorting persisted after pair rdd transformations?
Thanks Daniel. I can understand that the keys will not be in sorted order but what I am trying to understanding is whether the functions are passed values in sorted order in a given partition. For example: sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t = t._2).foldByKey(0)((a, b) = b).collect res0: Array[(Int, Int)] = Array((1,8)) The fold always given me last value as 8 which suggests values preserve sorting earlier defined in stage in DAG? On Wed Nov 19 2014 at 18:10:11 Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Akhil, I think Aniket uses the word persisted in a different way than what you mean. I.e. not in the RDD.persist() way. Aniket asks if running combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting is preserved.) I think the answer is no. combineByKey uses AppendOnlyMap, which is a hashmap. That will shuffle your keys. You can quickly verify it in spark-shell: scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect res0: Array[(Int, Int)] = Array((8,1), (7,1)) (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first number that demonstrates this.) On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: If something is persisted you can easily see them under the Storage tab in the web ui. Thanks Best Regards On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?
Re: Is sorting persisted after pair rdd transformations?
Ah, so I misunderstood you too :). My reading of org/ apache/spark/Aggregator.scala is that your function will always see the items in the order that they are in the input RDD. An RDD partition is always accessed as an iterator, so it will not be read out of order. On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Thanks Daniel. I can understand that the keys will not be in sorted order but what I am trying to understanding is whether the functions are passed values in sorted order in a given partition. For example: sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t = t._2).foldByKey(0)((a, b) = b).collect res0: Array[(Int, Int)] = Array((1,8)) The fold always given me last value as 8 which suggests values preserve sorting earlier defined in stage in DAG? On Wed Nov 19 2014 at 18:10:11 Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Akhil, I think Aniket uses the word persisted in a different way than what you mean. I.e. not in the RDD.persist() way. Aniket asks if running combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting is preserved.) I think the answer is no. combineByKey uses AppendOnlyMap, which is a hashmap. That will shuffle your keys. You can quickly verify it in spark-shell: scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect res0: Array[(Int, Int)] = Array((8,1), (7,1)) (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first number that demonstrates this.) On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: If something is persisted you can easily see them under the Storage tab in the web ui. Thanks Best Regards On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?
Debugging spark java application
Hello experts, Is there an easy way to debug a spark java application? I'm putting debug logs in the map's function but there aren't any logs on the console. Also can i include my custom jars while launching spark-shell and do my poc there? This might me a naive question but any help here is appreciated.
GraphX bug re-opened
We keep running into https://issues.apache.org/jira/browse/SPARK-2823 when trying to use GraphX. The cost of repartitioning the data is really high for us (lots of network traffic) which is killing the job performance. I understand the bug was reverted to stabilize unit tests, but frankly it makes it very hard to tune Spark applications with the limits this puts on someone. What is the process to get fixing this prioritized if we do not have the cycles to do it ourselves?
can not found scala.reflect related methods when running spark program
Hi, I wrote below simple spark code, and met a runtime issue which seems that the system can't find some methods of scala refect library. package org.apache.spark.examples import scala.io.Source import scala.reflect._ import scala.reflect.api.JavaUniverse import scala.reflect.runtime.universe import org.apache.spark.SparkContext._ import org.apache.spark.{SparkConf, SparkContext} import scala.reflect.NameTransformer import scala.reflect.NameTransformer._ object test{ def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(test) val ctx = new SparkContext(sparkConf) val lines = ctx.textFile(data) val rules = lines.map{ s = val parts = s.split(,) val part0 = parts(0) (part0, s) }.distinct().groupByKey().cache() val ru = scala.reflect.runtime.universe println(End.); ctx.stop() } } after compiling above codes, I used below command to submit the application. In the submitting command, I used --driver-class-path to set classpath to include the path of scala-reflect.jar $ spark-submit --master local --class org.apache.spark.examples.test --driver-class-path /MY_GRAPH_PATH/lib/graph-core_2.11-1.9.0.jar:/MY_SPARK_PATH/lib/spark-assembly-1.1.0-hadoop2.4.0.jar:/MY_SCALA_PATH/lib/scala-reflect.jar /MY_APP_PATH/test/bin/test.jar then I got following failures: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.NameTransformer$.LOCAL_SUFFIX_STRING()Ljava/lang/String; at scala.reflect.internal.StdNames$CommonNames.init(StdNames.scala:97) at scala.reflect.internal.StdNames$Keywords.init(StdNames.scala:203) at scala.reflect.internal.StdNames$TermNames.init(StdNames.scala:288) at scala.reflect.internal.StdNames$nme$.init(StdNames.scala:1045) at scala.reflect.internal.SymbolTable.nme$lzycompute(SymbolTable.scala:16) at scala.reflect.internal.SymbolTable.nme(SymbolTable.scala:16) at scala.reflect.internal.StdNames$class.$init$(StdNames.scala:1041) at scala.reflect.internal.SymbolTable.init(SymbolTable.scala:16) at scala.reflect.runtime.JavaUniverse.init(JavaUniverse.scala:16) at scala.reflect.runtime.package$.universe$lzycompute(package.scala:17) at scala.reflect.runtime.package$.universe(package.scala:17) at RouteChecker$.main(test.scala:32) at RouteChecker.main(test.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) However, when commenting out the spark related code from above program, I used scala to compile and run the program, and found that the program can work fine. Dose anyone know about this? thank you very much. Best Regards, Dingfei -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/can-not-found-scala-reflect-related-methods-when-running-spark-program-tp19273.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Cannot access data after a join (error: value _1 is not a member of Product with Serializable)
I joined two datasets together, and my resulting logs look like this: (975894369,((72364,20141112T170627,web,MEMPHIS,AR,US,Central),(Male,John,Smith))) (253142991,((30058,20141112T171246,web,ATLANTA16,GA,US,Southeast),(Male,Bob,Jones))) (295305425,((28110,20141112T170454,iph,CHARLOTTE2,NC,US,Southeast),(Female,Mary,Williams))) When I try to access the newly-joined data with JoinedInv.map(line = line._2._2._1) I get the following error: [ERROR] error: value _1 is not a member of Product with Serializable [INFO] val getOne = JoinedInv.map(line = line._2._2._1) [INFO] ^ [ERROR] error: value foreach is not a member of Array[Nothing] [INFO] getOne.take(10).foreach(println) [INFO]^ It looks like there are some rows where a JOIN did not occur (no key match in the joined dataset), but because I can't access line._2._2._1 I don't know of a way to check for that. I can access line._2._2 but line._2._2 does not have the length attribute. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-access-data-after-a-join-error-value-1-is-not-a-member-of-Product-with-Serializable-tp19272.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why is ALS class serializable ?
When a field of an object is enclosed in a closure, the object itself is also enclosed automatically, thus the object need to be serializable. On 11/19/14 6:39 PM, Hao Ren wrote: Hi, When reading through ALS code, I find that: class ALS private ( private var numUserBlocks: Int, private var numProductBlocks: Int, private var rank: Int, private var iterations: Int, private var lambda: Double, private var implicitPrefs: Boolean, private var alpha: Double, private var seed: Long = System.nanoTime() ) extends *Serializable *with Logging and why should ALS extend Serializable ? if not, there will be an Exception: task is not serializable, ALS is not serializable. I did not find any closure functions in which ALS is referenced. Any idea ? Thx. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-ALS-class-serializable-tp19262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cannot access data after a join (error: value _1 is not a member of Product with Serializable)
can you please post the full source of your code and some sample data to run it on ? 2014-11-19 16:23 GMT+01:00 YaoPau jonrgr...@gmail.com: I joined two datasets together, and my resulting logs look like this: (975894369,((72364,20141112T170627,web,MEMPHIS,AR,US,Central),(Male,John,Smith))) (253142991,((30058,20141112T171246,web,ATLANTA16,GA,US,Southeast),(Male,Bob,Jones))) (295305425,((28110,20141112T170454,iph,CHARLOTTE2,NC,US,Southeast),(Female,Mary,Williams))) When I try to access the newly-joined data with JoinedInv.map(line = line._2._2._1) I get the following error: [ERROR] error: value _1 is not a member of Product with Serializable [INFO] val getOne = JoinedInv.map(line = line._2._2._1) [INFO] ^ [ERROR] error: value foreach is not a member of Array[Nothing] [INFO] getOne.take(10).foreach(println) [INFO]^ It looks like there are some rows where a JOIN did not occur (no key match in the joined dataset), but because I can't access line._2._2._1 I don't know of a way to check for that. I can access line._2._2 but line._2._2 does not have the length attribute. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-access-data-after-a-join-error-value-1-is-not-a-member-of-Product-with-Serializable-tp19272.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is sorting persisted after pair rdd transformations?
Thanks Daniel :-). It seems to make sense and something I was hoping for. I will proceed with this assumption and report back if I see any anomalies. On Wed Nov 19 2014 at 19:30:02 Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Ah, so I misunderstood you too :). My reading of org/ apache/spark/Aggregator.scala is that your function will always see the items in the order that they are in the input RDD. An RDD partition is always accessed as an iterator, so it will not be read out of order. On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Thanks Daniel. I can understand that the keys will not be in sorted order but what I am trying to understanding is whether the functions are passed values in sorted order in a given partition. For example: sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t = t._2).foldByKey(0)((a, b) = b).collect res0: Array[(Int, Int)] = Array((1,8)) The fold always given me last value as 8 which suggests values preserve sorting earlier defined in stage in DAG? On Wed Nov 19 2014 at 18:10:11 Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Akhil, I think Aniket uses the word persisted in a different way than what you mean. I.e. not in the RDD.persist() way. Aniket asks if running combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting is preserved.) I think the answer is no. combineByKey uses AppendOnlyMap, which is a hashmap. That will shuffle your keys. You can quickly verify it in spark-shell: scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect res0: Array[(Int, Int)] = Array((8,1), (7,1)) (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first number that demonstrates this.) On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: If something is persisted you can easily see them under the Storage tab in the web ui. Thanks Best Regards On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?
Re: Getting spark job progress programmatically
I have for now submitted a JIRA ticket @ https://issues.apache.org/jira/browse/SPARK-4473. I will collate all my experiences ( hacks) and submit them as a feature request for public API. On Tue Nov 18 2014 at 20:35:00 andy petrella andy.petre...@gmail.com wrote: yep, we should also propose to add this stuffs in the public API. Any other ideas? On Tue Nov 18 2014 at 4:03:35 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Thanks Andy. This is very useful. This gives me all active stages their percentage completion but I am unable to tie stages to job group (or specific job). I looked at Spark's code and to me, it seems org.apache.spark.scheduler.ActiveJob's group ID should get propagated to StageInfo (possibly in the StageInfo.fromStage method). For now, I will have to write my own version JobProgressListener that stores stageId to group Id mapping. I will submit a JIRA ticket and seek spark dev's opinion on this. Many thanks for your prompt help Andy. Thanks, Aniket On Tue Nov 18 2014 at 19:40:06 andy petrella andy.petre...@gmail.com wrote: I started some quick hack for that in the notebook, you can head to: https://github.com/andypetrella/spark-notebook/ blob/master/common/src/main/scala/notebook/front/widgets/SparkInfo.scala On Tue Nov 18 2014 at 2:44:48 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am writing yet another Spark job server and have been able to submit jobs and return/save results. I let multiple jobs use the same spark context but I set job group while firing each job so that I can in future cancel jobs. Further, what I deserve to do is provide some kind of status update/progress on running jobs (a % completion but be awesome) but I am unable to figure out appropriate spark API to use. I do however see status reporting in spark UI so there must be a way to get status of various stages per job group. Any hints on what APIs should I look at?
Re: Getting spark job progress programmatically
Thanks for pointing this out Mark. Had totally missed the existing JIRA items On Wed Nov 19 2014 at 21:42:19 Mark Hamstra m...@clearstorydata.com wrote: This is already being covered by SPARK-2321 and SPARK-4145. There are pull requests that are already merged or already very far along -- e.g., https://github.com/apache/spark/pull/3009 If there is anything that needs to be added, please add it to those issues or PRs. On Wed, Nov 19, 2014 at 7:55 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I have for now submitted a JIRA ticket @ https://issues.apache.org/jira/browse/SPARK-4473. I will collate all my experiences ( hacks) and submit them as a feature request for public API. On Tue Nov 18 2014 at 20:35:00 andy petrella andy.petre...@gmail.com wrote: yep, we should also propose to add this stuffs in the public API. Any other ideas? On Tue Nov 18 2014 at 4:03:35 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Thanks Andy. This is very useful. This gives me all active stages their percentage completion but I am unable to tie stages to job group (or specific job). I looked at Spark's code and to me, it seems org.apache.spark.scheduler.ActiveJob's group ID should get propagated to StageInfo (possibly in the StageInfo.fromStage method). For now, I will have to write my own version JobProgressListener that stores stageId to group Id mapping. I will submit a JIRA ticket and seek spark dev's opinion on this. Many thanks for your prompt help Andy. Thanks, Aniket On Tue Nov 18 2014 at 19:40:06 andy petrella andy.petre...@gmail.com wrote: I started some quick hack for that in the notebook, you can head to: https://github.com/andypetrella/spark-notebook/ blob/master/common/src/main/scala/notebook/front/widgets/ SparkInfo.scala On Tue Nov 18 2014 at 2:44:48 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am writing yet another Spark job server and have been able to submit jobs and return/save results. I let multiple jobs use the same spark context but I set job group while firing each job so that I can in future cancel jobs. Further, what I deserve to do is provide some kind of status update/progress on running jobs (a % completion but be awesome) but I am unable to figure out appropriate spark API to use. I do however see status reporting in spark UI so there must be a way to get status of various stages per job group. Any hints on what APIs should I look at?
Re: Getting spark job progress programmatically
This is already being covered by SPARK-2321 and SPARK-4145. There are pull requests that are already merged or already very far along -- e.g., https://github.com/apache/spark/pull/3009 If there is anything that needs to be added, please add it to those issues or PRs. On Wed, Nov 19, 2014 at 7:55 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I have for now submitted a JIRA ticket @ https://issues.apache.org/jira/browse/SPARK-4473. I will collate all my experiences ( hacks) and submit them as a feature request for public API. On Tue Nov 18 2014 at 20:35:00 andy petrella andy.petre...@gmail.com wrote: yep, we should also propose to add this stuffs in the public API. Any other ideas? On Tue Nov 18 2014 at 4:03:35 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Thanks Andy. This is very useful. This gives me all active stages their percentage completion but I am unable to tie stages to job group (or specific job). I looked at Spark's code and to me, it seems org.apache.spark.scheduler.ActiveJob's group ID should get propagated to StageInfo (possibly in the StageInfo.fromStage method). For now, I will have to write my own version JobProgressListener that stores stageId to group Id mapping. I will submit a JIRA ticket and seek spark dev's opinion on this. Many thanks for your prompt help Andy. Thanks, Aniket On Tue Nov 18 2014 at 19:40:06 andy petrella andy.petre...@gmail.com wrote: I started some quick hack for that in the notebook, you can head to: https://github.com/andypetrella/spark-notebook/ blob/master/common/src/main/scala/notebook/front/widgets/ SparkInfo.scala On Tue Nov 18 2014 at 2:44:48 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am writing yet another Spark job server and have been able to submit jobs and return/save results. I let multiple jobs use the same spark context but I set job group while firing each job so that I can in future cancel jobs. Further, what I deserve to do is provide some kind of status update/progress on running jobs (a % completion but be awesome) but I am unable to figure out appropriate spark API to use. I do however see status reporting in spark UI so there must be a way to get status of various stages per job group. Any hints on what APIs should I look at?
Spark Streaming with Flume or Kafka?
Hi, I'm starting with Spark and I just trying to understand if I want to use Spark Streaming, should I use to feed it Flume or Kafka? I think there's not a official Sink for Flume to Spark Streaming and it seems that Kafka it fits better since gives you readibility. Could someone give a good scenario for each alternative? When would it make sense to use Kafka and when Flume for Spark Streaming? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on YARN
Hi all! Thanks for answering! @Sean, I tried to run with 30 executor-cores , and 1 machine still without processing. @Vanzin, I checked RM's web UI, and all nodes were detecteds and RUNNING. The interesting fact is that available memory and available core of 1 node was different of other 2, with just 1 available core and 1 available gig ram. @All, I created a new cluster with 10 slaves and 1 master, and now 9 of my slaves are working, and 1 still without processing. It's fine by me! I'm just wondering why YARN's doing it... Does anyone know the answer? 2014-11-18 16:18 GMT-02:00 Sean Owen so...@cloudera.com: My guess is you're asking for all cores of all machines but the driver needs at least one core, so one executor is unable to find a machine to fit on. On Nov 18, 2014 7:04 PM, Alan Prando a...@scanboo.com.br wrote: Hi Folks! I'm running Spark on YARN cluster installed with Cloudera Manager Express. The cluster has 1 master and 3 slaves, each machine with 32 cores and 64G RAM. My spark's job is working fine, however it seems that just 2 of 3 slaves are working (htop shows 2 slaves working 100% on 32 cores, and 1 slaves without any processing). I'm using this command: ./spark-submit --master yarn --num-executors 3 --executor-cores 32 --executor-memory 32g feature_extractor.py -r 390 Additionaly, spark's log testify communications with 2 slaves only: 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-180.ec2.internal:33177/user/Executor#-113177469] with ID 1 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-180.ec2.internal to /default 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-179.ec2.internal:51859/user/Executor#-323896724] with ID 2 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-179.ec2.internal to /default 14/11/18 17:19:38 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-180.ec2.internal:50959 with 16.6 GB RAM 14/11/18 17:19:39 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-179.ec2.internal:53557 with 16.6 GB RAM 14/11/18 17:19:51 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 3(ms) Is there a configuration to call spark's job on YARN cluster with all slaves? Thanks in advance! =] --- Regards Alan Vidotti Prando.
tableau spark sql cassandra
Hello! I'm working on a POC with Spark SQL, where I’m trying to get data from Cassandra into Tableau using Spark SQL. Here is the stack: - cassandra (v2.1) - spark SQL (pre build v1.1 hadoop v2.4) - cassandra / spark sql connector (https://github.com/datastax/spark-cassandra-connector) - hive - mysql - hive / mysql connector - hive / cassandra handler (https://github.com/tuplejump/cash/tree/master/cassandra-handler) - tableau - tableau / spark sql connector I get an exception in spark-sql (bin/spark-sql) when trying to query the cassandra table (java.lang.InstantiationError: org.apache.hadoop.mapreduce.JobContext), it looks like a missing hadoop dependency; showing tables or describing them work fine. Do you know how to solve this without of hadoop? Is Hive a dependency in Spark SQL? Best, Jerome -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/tableau-spark-sql-cassandra-tp19282.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SQL] HiveThriftServer2 failure detection
Hi all, I am running HiveThriftServer2 and noticed that the process stays up even though there is no driver connected to the Spark master. I started the server via sbin/start-thriftserver and my namenodes are currently not operational. I can see from the log that there was an error on startup: 14/11/19 16:32:58 ERROR HiveThriftServer2: Error starting HiveThriftServer2 and that the driver shut down as expected: 14/11/19 16:32:59 INFO SparkUI: Stopped Spark web UI at http://myip:4040 14/11/19 16:32:59 INFO DAGScheduler: Stopping DAGScheduler 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Shutting down all executors 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 14/11/19 16:33:00 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/11/19 16:33:00 INFO MemoryStore: MemoryStore cleared 14/11/19 16:33:00 INFO BlockManager: BlockManager stopped 14/11/19 16:33:00 INFO BlockManagerMaster: BlockManagerMaster stopped 14/11/19 16:33:00 INFO SparkContext: Successfully stopped SparkContext However, when I try to run start-thriftserver.sh again I see an error message that the process is already running and indeed there is a process with that PID: root 32334 1 0 16:32 ?00:00:00 /usr/local/bin/java org.apache.spark.deploy.SparkSubmitDriverBootstrapper --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master spark://myip:7077 --conf -spark.executor.extraJavaOptions=-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps spark-internal --hiveconf hive.root.logger=INFO,console Is this a bug or design decision -- I am upgrading from Shark and we had scripts that monitor the driver and restart on failure. Here it seems that we would not be able to restart even though the driver died?
Re: Debugging spark java application
For debugging you can refer these two threads http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-hit-breakpoints-using-IntelliJ-In-functions-used-by-an-RDD-td12754.html http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3ccahuq+_ygfioj2aa3e2zsh7zfsv_z-wsorhvbipahxjlm2fj...@mail.gmail.com%3E If you put the logs in the map function and run your code in Standalone mode, then those logs will be in your worker directory, they will not be displayed in the driver's console. For Adding the jars while launching spark-shell, you could add those jars in the SPARK_CLASSPATH in conf/spark-env.sh file or you could say sc.addJar(/path/to/jar) Thanks Best Regards On Wed, Nov 19, 2014 at 7:58 PM, Mukesh Jha mukh@gmail.com wrote: Hello experts, Is there an easy way to debug a spark java application? I'm putting debug logs in the map's function but there aren't any logs on the console. Also can i include my custom jars while launching spark-shell and do my poc there? This might me a naive question but any help here is appreciated.
Re: [SQL] HiveThriftServer2 failure detection
This is not by design. Can you please file a JIRA? On Wed, Nov 19, 2014 at 9:19 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi all, I am running HiveThriftServer2 and noticed that the process stays up even though there is no driver connected to the Spark master. I started the server via sbin/start-thriftserver and my namenodes are currently not operational. I can see from the log that there was an error on startup: 14/11/19 16:32:58 ERROR HiveThriftServer2: Error starting HiveThriftServer2 and that the driver shut down as expected: 14/11/19 16:32:59 INFO SparkUI: Stopped Spark web UI at http://myip:4040 14/11/19 16:32:59 INFO DAGScheduler: Stopping DAGScheduler 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Shutting down all executors 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 14/11/19 16:33:00 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/11/19 16:33:00 INFO MemoryStore: MemoryStore cleared 14/11/19 16:33:00 INFO BlockManager: BlockManager stopped 14/11/19 16:33:00 INFO BlockManagerMaster: BlockManagerMaster stopped 14/11/19 16:33:00 INFO SparkContext: Successfully stopped SparkContext However, when I try to run start-thriftserver.sh again I see an error message that the process is already running and indeed there is a process with that PID: root 32334 1 0 16:32 ?00:00:00 /usr/local/bin/java org.apache.spark.deploy.SparkSubmitDriverBootstrapper --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master spark://myip:7077 --conf -spark.executor.extraJavaOptions=-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps spark-internal --hiveconf hive.root.logger=INFO,console Is this a bug or design decision -- I am upgrading from Shark and we had scripts that monitor the driver and restart on failure. Here it seems that we would not be able to restart even though the driver died?
Re: SparkSQL and Hive/Hive metastore testing - LocalHiveContext
On Tue, Nov 18, 2014 at 10:34 PM, Night Wolf nightwolf...@gmail.com wrote: Is there a better way to mock this out and test Hive/metastore with SparkSQL? I would use TestHive which creates a fresh metastore each time it is invoked.
Shuffle Intensive Job: sendMessageReliably failed because ack was not received within 60 sec
Has anyone else received this type of error? We are not sure what the issue is nor how to correct it to get our job to complete...
Re: Merging Parquet Files
On Wed, Nov 19, 2014 at 12:41 AM, Daniel Haviv danielru...@gmail.com wrote: Another problem I have is that I get a lot of small json files and as a result a lot of small parquet files, I'd like to merge the json files into a few parquet files.. how I do that? You can use `coalesce` on any RDD to merge files.
Re: How to apply schema to queried data from Hive before saving it as parquet file?
I am not very familiar with the JSONSerDe for Hive, but in general you should not need to manually create a schema for data that is loaded from hive. You should just be able to call saveAsParquetFile on any SchemaRDD that is returned from hctx.sql(...). I'd also suggest you check out the jsonFile/jsonRDD methods that are available on HiveContext. On Wed, Nov 19, 2014 at 1:34 AM, akshayhazari akshayhaz...@gmail.com wrote: The below part of code contains a part which creates a table in hive from data and and another part below creates a Schema. *Now if I try to save the quried data as a parquet file where hctx.sql(Select * from sparkHive1) returns me a SchemaRDD which contains records from table .* hctx.sql(Select * from sparkHive1).saveAsParquetFile(/home/hduser/Documents/Credentials/Newest_Credentials_AX/Songs/spark-1.1.0/HiveOP); *As per the code in the following link before saving the file as a Parquet File the sqlContext is applied with a schema. How can I do that(save as parquet file) when I am using Hive Context to fetch data.* http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files Any Help Please. -- HiveContext hctx= new HiveContext(sctx); //sctx SparkContext hctx.sql(Select * from sparkHive1) hctx.sql(ADD JAR /home/hduser/BIGDATA_STUFF/Java_Hive2/hive-json-serde-0.2.jar); hctx.sql(Create table if not exists sparkHive1(id INT,name STRING,score INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.\ JsonSerde'); hctx.sql(Load data local inpath '/home/hduser/Documents/Credentials/Newest_Credentials_AX/Songs/spark-1.1.0/ip3.json' into table sparkHive1); String schemaString = id name score; ListStructField fields = new ArrayListStructField(); for (String fieldName: schemaString.split( )) { if(fieldName.contains(name)) fields.add(DataType.createStructField(fieldName, DataType.StringType, true)); else fields.add(DataType.createStructField(fieldName, DataType.IntegerType, true)); } StructType schema = DataType.createStructType(fields); *//How can I apply the schema before saving as parquet file.* hctx.sql(Select * from sparkHive1).saveAsParquetFile(/home/hduser/Documents/Credentials/Newest_Credentials_AX/Songs/spark-1.1.0/HiveOP); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-apply-schema-to-queried-data-from-Hive-before-saving-it-as-parquet-file-tp19259.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: tableau spark sql cassandra
The whole stacktrack/exception would be helpful. Hive is an optional dependency of Spark SQL, but you will need to include it if you are planning to use the thrift server to connect to Tableau. You can enable it by add -Phive when you build Spark. You might also try asking on the cassandra mailing list as there could be something wrong with your configuration there. On Wed, Nov 19, 2014 at 8:40 AM, jererc jer...@gmail.com wrote: Hello! I'm working on a POC with Spark SQL, where I’m trying to get data from Cassandra into Tableau using Spark SQL. Here is the stack: - cassandra (v2.1) - spark SQL (pre build v1.1 hadoop v2.4) - cassandra / spark sql connector (https://github.com/datastax/spark-cassandra-connector) - hive - mysql - hive / mysql connector - hive / cassandra handler (https://github.com/tuplejump/cash/tree/master/cassandra-handler) - tableau - tableau / spark sql connector I get an exception in spark-sql (bin/spark-sql) when trying to query the cassandra table (java.lang.InstantiationError: org.apache.hadoop.mapreduce.JobContext), it looks like a missing hadoop dependency; showing tables or describing them work fine. Do you know how to solve this without of hadoop? Is Hive a dependency in Spark SQL? Best, Jerome -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/tableau-spark-sql-cassandra-tp19282.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle Intensive Job: sendMessageReliably failed because ack was not received within 60 sec
That error can mean a whole bunch of things (and we've been working in recently to make it more descriptive). Often the actual cause is in the executor logs. On Wed, Nov 19, 2014 at 10:50 AM, Gary Malouf malouf.g...@gmail.com wrote: Has anyone else received this type of error? We are not sure what the issue is nor how to correct it to get our job to complete...
Re: Converting a json struct to map
You can override the schema inference by passing a schema as the second argument to jsonRDD, however thats not a super elegant solution. We are considering one option to make this easier here: https://issues.apache.org/jira/browse/SPARK-4476 On Tue, Nov 18, 2014 at 11:06 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Something like this? val map_rdd = json_rdd.map(json = { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) val myMap = mapper.readValue[Map[String,String]](json) myMap }) Thanks Best Regards On Wed, Nov 19, 2014 at 11:01 AM, Daniel Haviv danielru...@gmail.com wrote: Hi, I'm loading a json file into a RDD and then save that RDD as parquet. One of the fields is a map of keys and values but it is being translated and stored as a struct. How can I convert the field into a map? Thanks, Daniel
Re: [SQL] HiveThriftServer2 failure detection
https://issues.apache.org/jira/browse/SPARK-4497 On Wed, Nov 19, 2014 at 1:48 PM, Michael Armbrust mich...@databricks.com wrote: This is not by design. Can you please file a JIRA? On Wed, Nov 19, 2014 at 9:19 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi all, I am running HiveThriftServer2 and noticed that the process stays up even though there is no driver connected to the Spark master. I started the server via sbin/start-thriftserver and my namenodes are currently not operational. I can see from the log that there was an error on startup: 14/11/19 16:32:58 ERROR HiveThriftServer2: Error starting HiveThriftServer2 and that the driver shut down as expected: 14/11/19 16:32:59 INFO SparkUI: Stopped Spark web UI at http://myip:4040 14/11/19 16:32:59 INFO DAGScheduler: Stopping DAGScheduler 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Shutting down all executors 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 14/11/19 16:33:00 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/11/19 16:33:00 INFO MemoryStore: MemoryStore cleared 14/11/19 16:33:00 INFO BlockManager: BlockManager stopped 14/11/19 16:33:00 INFO BlockManagerMaster: BlockManagerMaster stopped 14/11/19 16:33:00 INFO SparkContext: Successfully stopped SparkContext However, when I try to run start-thriftserver.sh again I see an error message that the process is already running and indeed there is a process with that PID: root 32334 1 0 16:32 ?00:00:00 /usr/local/bin/java org.apache.spark.deploy.SparkSubmitDriverBootstrapper --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master spark://myip:7077 --conf -spark.executor.extraJavaOptions=-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps spark-internal --hiveconf hive.root.logger=INFO,console Is this a bug or design decision -- I am upgrading from Shark and we had scripts that monitor the driver and restart on failure. Here it seems that we would not be able to restart even though the driver died?
Re: Converting a json struct to map
Thank you Michael I will try it out tomorrow Daniel On 19 בנוב׳ 2014, at 21:07, Michael Armbrust mich...@databricks.com wrote: You can override the schema inference by passing a schema as the second argument to jsonRDD, however thats not a super elegant solution. We are considering one option to make this easier here: https://issues.apache.org/jira/browse/SPARK-4476 On Tue, Nov 18, 2014 at 11:06 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Something like this? val map_rdd = json_rdd.map(json = { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) val myMap = mapper.readValue[Map[String,String]](json) myMap }) Thanks Best Regards On Wed, Nov 19, 2014 at 11:01 AM, Daniel Haviv danielru...@gmail.com wrote: Hi, I'm loading a json file into a RDD and then save that RDD as parquet. One of the fields is a map of keys and values but it is being translated and stored as a struct. How can I convert the field into a map? Thanks, Daniel
Re: spark streaming and the spark shell
I am hitting the same issue, i.e., after running for some time, if spark streaming job lost or timeout kafka connection, it will just start to return empty RDD's .. Is there a timeline for when this issue will be fixed so that I can plan accordingly? Thanks. Tian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p19296.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
querying data from Cassandra through the Spark SQL Thrift JDBC server
Hi - I was curious if anyone is using the Spark SQL Thrift JDBC server with Cassandra. It would be great be if you could share how you got it working? For example, what config changes have to be done in hive-site.xml, what additional jars are required, etc.? I have a Spark app that can programmatically query data from Cassandra using Spark SQL and Spark-Cassandra-Connector. No problem there, but I couldn't find any documentation for using the Thrift JDBC server for querying data from Cassandra. Thanks, Mohammed
rack-topology.sh no such file or directory
I'm trying to run Spark on Yarn on a hortonworks 2.1.5 cluster. I'm getting this error: 14/11/19 13:46:34 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@#/user/Executor#- 2027837001] with ID 42 14/11/19 13:46:34 WARN net.ScriptBasedMapping: Exception running /etc/hadoop/conf/rack-topology.sh 10.0.28.130 java.io.IOException: Cannot run program /etc/hadoop/conf/rack-topology.sh (in directory ###): error=2, No such file or directory The rack-topology script is not on system (find / 2/dev/null -name rack-topology). Any possibly solution? Arun Luthra
Re: Converting a json struct to map
Oh, actually, we do not support MapType provided by the schema given to jsonRDD at the moment (my bad..). Daniel, you need to wait for the patch of 4476 (I should have one soon). Thanks, Yin On Wed, Nov 19, 2014 at 2:32 PM, Daniel Haviv danielru...@gmail.com wrote: Thank you Michael I will try it out tomorrow Daniel On 19 בנוב׳ 2014, at 21:07, Michael Armbrust mich...@databricks.com wrote: You can override the schema inference by passing a schema as the second argument to jsonRDD, however thats not a super elegant solution. We are considering one option to make this easier here: https://issues.apache.org/jira/browse/SPARK-4476 On Tue, Nov 18, 2014 at 11:06 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Something like this? val map_rdd = json_rdd.map(json = { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) val myMap = mapper.readValue[Map[String,String]](json) myMap }) Thanks Best Regards On Wed, Nov 19, 2014 at 11:01 AM, Daniel Haviv danielru...@gmail.com wrote: Hi, I'm loading a json file into a RDD and then save that RDD as parquet. One of the fields is a map of keys and values but it is being translated and stored as a struct. How can I convert the field into a map? Thanks, Daniel
Re: rack-topology.sh no such file or directory
Your Hadoop configuration is set to look for this file to determine racks. Is the file present on cluster nodes? If not, look at your hdfs-site.xml and remove the setting for a rack topology script there (or it might be in core-site.xml). Matei On Nov 19, 2014, at 12:13 PM, Arun Luthra arun.lut...@gmail.com wrote: I'm trying to run Spark on Yarn on a hortonworks 2.1.5 cluster. I'm getting this error: 14/11/19 13:46:34 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@#/user/Executor#-2027837001 tel:2027837001] with ID 42 14/11/19 13:46:34 WARN net.ScriptBasedMapping: Exception running /etc/hadoop/conf/rack-topology.sh 10.0.28.130 java.io.IOException: Cannot run program /etc/hadoop/conf/rack-topology.sh (in directory ###): error=2, No such file or directory The rack-topology script is not on system (find / 2/dev/null -name rack-topology). Any possibly solution? Arun Luthra
Re: Spark Streaming with Flume or Kafka?
As of now, you can feed Spark Streaming from both kafka and flume. Currently though there is no API to write data back to either of the two directly. I sent a PR which should eventually add something like this: https://github.com/harishreedharan/spark/blob/Kafka-output/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala that would allow Spark Streaming to write back to Kafka. This will likely be reviewed and committed after 1.2. I would consider writing something similar to push data to Flume as well, if there is a sufficient use-case for it. I have seen people talk about writing back to kafka quite a bit - hence the above patch. Which one is better is upto your use-case and existing infrastructure and preference. Both would work as is, but writing back to Flume would usually be if you want to write to HDFS/HBase/Solr etc -- which you could write back directly from Spark Streaming (of course, there are benefits of writing back using Flume like the additional buffering etc Flume gives), but it is still possible to do so from Spark Streaming itself. But for Kafka, the usual use-case is a variety of custom applications reading the same data -- for which it makes a whole lot of sense to write back to Kafka. An example is to sanitize incoming data in Spark Streaming (from Flume or Kafka or something else) and make it available for a variety of apps via Kafka. Hope this helps! Hari On Wed, Nov 19, 2014 at 8:10 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm starting with Spark and I just trying to understand if I want to use Spark Streaming, should I use to feed it Flume or Kafka? I think there's not a official Sink for Flume to Spark Streaming and it seems that Kafka it fits better since gives you readibility. Could someone give a good scenario for each alternative? When would it make sense to use Kafka and when Flume for Spark Streaming? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming with Flume or Kafka?
Btw, if you want to write to Spark Streaming from Flume -- there is a sink (it is a part of Spark, not Flume). See Approach 2 here: http://spark.apache.org/docs/latest/streaming-flume-integration.html On Wed, Nov 19, 2014 at 12:41 PM, Hari Shreedharan hshreedha...@cloudera.com wrote: As of now, you can feed Spark Streaming from both kafka and flume. Currently though there is no API to write data back to either of the two directly. I sent a PR which should eventually add something like this: https://github.com/harishreedharan/spark/blob/Kafka-output/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala that would allow Spark Streaming to write back to Kafka. This will likely be reviewed and committed after 1.2. I would consider writing something similar to push data to Flume as well, if there is a sufficient use-case for it. I have seen people talk about writing back to kafka quite a bit - hence the above patch. Which one is better is upto your use-case and existing infrastructure and preference. Both would work as is, but writing back to Flume would usually be if you want to write to HDFS/HBase/Solr etc -- which you could write back directly from Spark Streaming (of course, there are benefits of writing back using Flume like the additional buffering etc Flume gives), but it is still possible to do so from Spark Streaming itself. But for Kafka, the usual use-case is a variety of custom applications reading the same data -- for which it makes a whole lot of sense to write back to Kafka. An example is to sanitize incoming data in Spark Streaming (from Flume or Kafka or something else) and make it available for a variety of apps via Kafka. Hope this helps! Hari On Wed, Nov 19, 2014 at 8:10 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm starting with Spark and I just trying to understand if I want to use Spark Streaming, should I use to feed it Flume or Kafka? I think there's not a official Sink for Flume to Spark Streaming and it seems that Kafka it fits better since gives you readibility. Could someone give a good scenario for each alternative? When would it make sense to use Kafka and when Flume for Spark Streaming? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Bug in Accumulators...
I'm running into similar problems with accumulators failing to serialize properly. Are there any examples of accumulators being used in more complex environments than simply initializing them in the same class and then using them in a .foreach() on an RDD referenced a few lines below? From the above looking error, it looks like any scala complexity at all which is added causes the closure cleaner to freak out with accumulators... On Fri, Nov 7, 2014 at 12:12 AM, Aaron Davidson ilike...@gmail.com wrote: This may be due in part to Scala allocating an anonymous inner class in order to execute the for loop. I would expect if you change it to a while loop like var i = 0 while (i 10) { sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x) i += 1 } then the problem may go away. I am not super familiar with the closure cleaner, but I believe that we cannot prune beyond 1 layer of references, so the extra class of nesting may be screwing something up. If this is the case, then I would also expect replacing the accumulator with any other reference to the enclosing scope (such as a broadcast variable) would have the same result. On Fri, Nov 7, 2014 at 12:03 AM, Shixiong Zhu zsxw...@gmail.com wrote: Could you provide all pieces of codes which can reproduce the bug? Here is my test code: import org.apache.spark._ import org.apache.spark.SparkContext._ object SimpleApp { def main(args: Array[String]) { val conf = new SparkConf().setAppName(SimpleApp) val sc = new SparkContext(conf) val accum = sc.accumulator(0) for (i - 1 to 10) { sc.parallelize(Array(1, 2, 3, 4)).foreach(x = accum += x) } sc.stop() } } It works fine both in client and cluster. Since this is a serialization bug, the outer class does matter. Could you provide it? Is there a SparkContext field in the outer class? Best Regards, Shixiong Zhu 2014-10-28 0:28 GMT+08:00 octavian.ganea octavian.ga...@inf.ethz.ch: I am also using spark 1.1.0 and I ran it on a cluster of nodes (it works if I run it in local mode! ) If I put the accumulator inside the for loop, everything will work fine. I guess the bug is that an accumulator can be applied to JUST one RDD. Still another undocumented 'feature' of Spark that no one from the people who maintain Spark is willing to solve or at least to tell us about ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p17372.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -jake
Re: Efficient way to split an input data set into different output files
I don't have a solution for you, but it sounds like you might want to follow this issue: SPARK-3533 https://issues.apache.org/jira/browse/SPARK-3533 - Add saveAsTextFileByKey() method to RDDs On Wed Nov 19 2014 at 6:41:11 AM Tom Seddon mr.tom.sed...@gmail.com wrote: I'm trying to set up a PySpark ETL job that takes in JSON log files and spits out fact table files for upload to Redshift. Is there an efficient way to send different event types to different outputs without having to just read the same cached RDD twice? I have my first RDD which is just a json parsed version of the input data, and I need to create a flattened page views dataset off this based on eventType = 'INITIAL', and then a page events dataset from the same RDD based on eventType = 'ADDITIONAL'. Ideally I'd like the output files for both these tables to be written at the same time, so I'm picturing a function with one input RDD in and two RDDs out, or a function utilising two CSV writers. I'm using mapPartitions at the moment to write to files like this: def write_records(records): output = StringIO.StringIO() writer = vlad.CsvUnicodeWriter(output, dialect='excel') for record in records: writer.writerow(record) return [output.getvalue()] and I use this in the call to write the file as follows (pageviews and events get created off the same json parsed RDD by filtering on INITIAL or ADDITIONAL respectively): pageviews.mapPartitions(writeRecords).saveAsTextFile('s3n://output/pageviews/') events.mapPartitions(writeRecords).saveAsTextFile(''s3n://output/events/) Is there a way to change this so that both are written in the same process?
Re: spark-shell giving me error of unread block data
Question ... when you mean different versions, different versions of dependency files? what are the dependency files for spark? On Tue Nov 18 2014 at 5:27:18 PM Anson Abraham anson.abra...@gmail.com wrote: when cdh cluster was running, i did not set up spark role. When I did for the first time, it was working ie, the same load of test file gave me output. But in this case, how can there be different versions? This is all done through cloudera manager parcels how does one find out version installed? I did do an rsync from master to the worker nodes, and that did not help me much. And we're talking about the spark-assembly jar files correct? or is there another set of jar files i should be checking for? On Tue Nov 18 2014 at 5:16:57 PM Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: It can be a serialization issue. Happens when there are different versions installed on the same system. What do you mean by the first time you installed and tested it out? On Wed, Nov 19, 2014 at 3:29 AM, Anson Abraham anson.abra...@gmail.com wrote: I'm essentially loading a file and saving output to another location: val source = sc.textFile(/tmp/testfile.txt) source.saveAsTextFile(/tmp/testsparkoutput) when i do so, i'm hitting this error: 14/11/18 21:15:08 INFO DAGScheduler: Failed to run saveAsTextFile at console:15 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, cloudera-1.testdomain.net): java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode( ObjectInputStream.java:2421) java.io.ObjectInputStream.readObject0(ObjectInputStream. java:1382) java.io.ObjectInputStream.defaultReadFields( ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData( ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream. java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream. readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance. deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ scheduler$DAGScheduler$$failJobAndIndependentStages( DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage( DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$ handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$ handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$ $anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker( ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( ForkJoinWorkerThread.java:107) Cant figure out what the issue is. I'm running in CDH5.2 w/ version of spark being 1.1. The file i'm loading is literally just 7 MB. I thought it was jar files mismatch, but i did a compare and see they're all identical. But seeing as how they were all installed through CDH parcels, not sure how there would be version mismatch on the nodes and master. Oh yeah 1 master node w/ 2 worker nodes and running in standalone not through yarn. So as a just in case, i copied the jars from the master to the 2 worker nodes as just in case, and still same issue. Weird
Can we make EdgeRDD and VertexRDD storage level to MEMORY_AND_DISK?
Hi, I'm running out of memory when I run a GraphX program for dataset moe than 10 GB, It was handle pretty well in case of noraml spark operation when did StorageLevel.MEMORY_AND_DISK. In case of GraphX I found its only allowed storing in memory, and it is because in Graph constructor, this property set by default. When I changed storage level as per my requirement, it doesn't allow and throw Error Message sayinh Cannot Modify StorageLevel when Its already set Please help me on these queries : 1 How to override current staorge level to MEMORY and DISK ? 2 If its not possible through constructor, what If I modify Graph.scala class and rebuild it to make it work? By applying this, is there any other things I need know? Thanks - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-we-make-EdgeRDD-and-VertexRDD-storage-level-to-MEMORY-AND-DISK-tp19307.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to get list of edges between two Vertex ?
Hi, I have a graph where no. of edges b/w two vertices are more than once possible. Now I need to find out who are top vertices between which no. of calls happen more? output should look like (V1, V2 , No. of edges) So I need to know, how to find out total no. of edges b/w only that two vertices. - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-list-of-edges-between-two-Vertex-tp19309.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Reading nested JSON data with Spark SQL
I have been using Spark SQL to read in JSON data, like so: val myJsonFile = sqc.jsonFile(args(myLocation)) myJsonFile.registerTempTable(myTable) sqc.sql(mySQLQuery).map { row = myFunction(row) } And then in myFunction(row) I can read the various columns with the Row.getX methods. However, this methods only work for basic types (string, int, ...). I was having some trouble reading columns that are arrays or maps (i.e. other JSON objects). I am now using Spark 1.2 from the Cloudera snapshot and I noticed that there is a new method getAs. I was able to use it to read for example an array of strings like so: t.getAs[Buffer[CharSequence]](12) However, if I try to read a column with a nested JSON object like this: t.getAs[Map[String, Any]](11) I get the following error: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to scala.collection.immutable.Map How can I read such a field? Am I just missing something small or should I be looking for a completely different alternative to reading JSON? Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini
[SQL]Proper use of spark.sql.thriftserver.scheduler.pool
Hi sparkers, I'm trying to use spark.sql.thriftserver.scheduler.pool for the first time (earlier I was stuck because of https://issues.apache.org/jira/browse/SPARK-4037) I have two pools setup: [image: Inline image 1] and would like to issue a query against the low priority pool. I am doing this (tried both from beeline and a different JDBC client, output below is from beeline): SET spark.sql.thriftserver.scheduler.pool=CRON; +-+ | | +-+ | spark.sql.thriftserver.scheduler.pool=CRON | +-+ 1 row selected (0.09 seconds) 1: jdbc:hive2://myip:10001 select count(*) from mytable; The query executes OK but does not execute against the CRON pool...Am I misusing this setting (my goal is to be able to allocate a large set of cores to Thriftserver but separate out to a low-priority pool some housekeeping tasks) Thanks for any tips.
Re: Reading nested JSON data with Spark SQL
You can extract the nested fields in sql: SELECT field.nestedField ... If you don't do that then nested fields are represented as rows within rows and can be retrieved as follows: t.getAs[Row](0).getInt(0) Also, I would write t.getAs[Buffer[CharSequence]](12) as t.getAs[Seq[String]](12) since we don't guarantee the return type will be a buffer. On Wed, Nov 19, 2014 at 1:33 PM, Simone Franzini captainfr...@gmail.com wrote: I have been using Spark SQL to read in JSON data, like so: val myJsonFile = sqc.jsonFile(args(myLocation)) myJsonFile.registerTempTable(myTable) sqc.sql(mySQLQuery).map { row = myFunction(row) } And then in myFunction(row) I can read the various columns with the Row.getX methods. However, this methods only work for basic types (string, int, ...). I was having some trouble reading columns that are arrays or maps (i.e. other JSON objects). I am now using Spark 1.2 from the Cloudera snapshot and I noticed that there is a new method getAs. I was able to use it to read for example an array of strings like so: t.getAs[Buffer[CharSequence]](12) However, if I try to read a column with a nested JSON object like this: t.getAs[Map[String, Any]](11) I get the following error: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to scala.collection.immutable.Map How can I read such a field? Am I just missing something small or should I be looking for a completely different alternative to reading JSON? Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini
Re: spark-shell giving me error of unread block data
Hi Anson, We've seen this error when incompatible classes are used in the driver and executors (e.g., same class name, but the classes are different and thus the serialized data is different). This can happen for example if you're including some 3rd party libraries in your app's jar, or changing the driver/executor class paths to include these conflicting libraries. Can you clarify whether any of the above apply to your case? (For example, one easy way to trigger this is to add the spark-examples jar shipped with CDH5.2 in the classpath of your driver. That's one of the reasons I filed SPARK-4048, but I digress.) On Tue, Nov 18, 2014 at 1:59 PM, Anson Abraham anson.abra...@gmail.com wrote: I'm essentially loading a file and saving output to another location: val source = sc.textFile(/tmp/testfile.txt) source.saveAsTextFile(/tmp/testsparkoutput) when i do so, i'm hitting this error: 14/11/18 21:15:08 INFO DAGScheduler: Failed to run saveAsTextFile at console:15 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, cloudera-1.testdomain.net): java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Cant figure out what the issue is. I'm running in CDH5.2 w/ version of spark being 1.1. The file i'm loading is literally just 7 MB. I thought it was jar files mismatch, but i did a compare and see they're all identical. But seeing as how they were all installed through CDH parcels, not sure how there would be version mismatch on the nodes and master. Oh yeah 1 master node w/ 2 worker nodes and running in standalone not through yarn. So as a just in case, i copied the jars from the master to the 2 worker nodes as just in case, and still same issue. Weird thing is, first time i installed and tested it out, it worked, but now it doesn't. Any help here would be greatly appreciated. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Strategies for reading large numbers of files
Hi Landon, I tried this but it didn't work for me. I get Task not serializable exception: Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration How do you make org.apache.hadoop.conf.Configuration hadoopConfiguration available to tasks? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Strategies-for-reading-large-numbers-of-files-tp15644p19314.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming with Flume or Kafka?
Thank you for your answer, I don't know if I typed the question correctly. But your nswer helps me. I'm going to make the question again for knowing if you understood me. I have this topology: DataSource1, , DataSourceN -- Kafka -- SparkStreaming -- HDFS DataSource1, , DataSourceN -- Flume -- SparkStreaming -- HDFS All data are going to be pro 2014-11-19 21:50 GMT+01:00 Hari Shreedharan hshreedha...@cloudera.com: Btw, if you want to write to Spark Streaming from Flume -- there is a sink (it is a part of Spark, not Flume). See Approach 2 here: http://spark.apache.org/docs/latest/streaming-flume-integration.html On Wed, Nov 19, 2014 at 12:41 PM, Hari Shreedharan hshreedha...@cloudera.com wrote: As of now, you can feed Spark Streaming from both kafka and flume. Currently though there is no API to write data back to either of the two directly. I sent a PR which should eventually add something like this: https://github.com/harishreedharan/spark/blob/Kafka-output/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala that would allow Spark Streaming to write back to Kafka. This will likely be reviewed and committed after 1.2. I would consider writing something similar to push data to Flume as well, if there is a sufficient use-case for it. I have seen people talk about writing back to kafka quite a bit - hence the above patch. Which one is better is upto your use-case and existing infrastructure and preference. Both would work as is, but writing back to Flume would usually be if you want to write to HDFS/HBase/Solr etc -- which you could write back directly from Spark Streaming (of course, there are benefits of writing back using Flume like the additional buffering etc Flume gives), but it is still possible to do so from Spark Streaming itself. But for Kafka, the usual use-case is a variety of custom applications reading the same data -- for which it makes a whole lot of sense to write back to Kafka. An example is to sanitize incoming data in Spark Streaming (from Flume or Kafka or something else) and make it available for a variety of apps via Kafka. Hope this helps! Hari On Wed, Nov 19, 2014 at 8:10 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm starting with Spark and I just trying to understand if I want to use Spark Streaming, should I use to feed it Flume or Kafka? I think there's not a official Sink for Flume to Spark Streaming and it seems that Kafka it fits better since gives you readibility. Could someone give a good scenario for each alternative? When would it make sense to use Kafka and when Flume for Spark Streaming? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming with Flume or Kafka?
Thank you for your answer, I don't know if I typed the question correctly. But your nswer helps me. I'm going to make the question again for knowing if you understood me. I have this topology: DataSource1, , DataSourceN -- Kafka -- SparkStreaming -- HDFS Kafka -- HDFS (raw data) DataSource1, , DataSourceN -- Flume -- SparkStreaming -- HDFS Flume -- HDFS (raw data) All data are going to be processed and going to HDFS as raw and processed data. I don't know if it makes sense to use Kafka in this case if data are just going to HDFS. I guess that before this FlumeSpark Sink has more sense to feed SparkStream with a real-time flow of data.. It doesn't look too much sense to have SparkStreaming and get the data from HDFS. 2014-11-19 22:55 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com: Thank you for your answer, I don't know if I typed the question correctly. But your nswer helps me. I'm going to make the question again for knowing if you understood me. I have this topology: DataSource1, , DataSourceN -- Kafka -- SparkStreaming -- HDFS DataSource1, , DataSourceN -- Flume -- SparkStreaming -- HDFS All data are going to be pro 2014-11-19 21:50 GMT+01:00 Hari Shreedharan hshreedha...@cloudera.com: Btw, if you want to write to Spark Streaming from Flume -- there is a sink (it is a part of Spark, not Flume). See Approach 2 here: http://spark.apache.org/docs/latest/streaming-flume-integration.html On Wed, Nov 19, 2014 at 12:41 PM, Hari Shreedharan hshreedha...@cloudera.com wrote: As of now, you can feed Spark Streaming from both kafka and flume. Currently though there is no API to write data back to either of the two directly. I sent a PR which should eventually add something like this: https://github.com/harishreedharan/spark/blob/Kafka-output/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala that would allow Spark Streaming to write back to Kafka. This will likely be reviewed and committed after 1.2. I would consider writing something similar to push data to Flume as well, if there is a sufficient use-case for it. I have seen people talk about writing back to kafka quite a bit - hence the above patch. Which one is better is upto your use-case and existing infrastructure and preference. Both would work as is, but writing back to Flume would usually be if you want to write to HDFS/HBase/Solr etc -- which you could write back directly from Spark Streaming (of course, there are benefits of writing back using Flume like the additional buffering etc Flume gives), but it is still possible to do so from Spark Streaming itself. But for Kafka, the usual use-case is a variety of custom applications reading the same data -- for which it makes a whole lot of sense to write back to Kafka. An example is to sanitize incoming data in Spark Streaming (from Flume or Kafka or something else) and make it available for a variety of apps via Kafka. Hope this helps! Hari On Wed, Nov 19, 2014 at 8:10 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm starting with Spark and I just trying to understand if I want to use Spark Streaming, should I use to feed it Flume or Kafka? I think there's not a official Sink for Flume to Spark Streaming and it seems that Kafka it fits better since gives you readibility. Could someone give a good scenario for each alternative? When would it make sense to use Kafka and when Flume for Spark Streaming? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-shell giving me error of unread block data
yeah but in this case i'm not building any files. just deployed out config files in CDH5.2 and initiated a spark-shell to just read and output a file. On Wed Nov 19 2014 at 4:52:51 PM Marcelo Vanzin van...@cloudera.com wrote: Hi Anson, We've seen this error when incompatible classes are used in the driver and executors (e.g., same class name, but the classes are different and thus the serialized data is different). This can happen for example if you're including some 3rd party libraries in your app's jar, or changing the driver/executor class paths to include these conflicting libraries. Can you clarify whether any of the above apply to your case? (For example, one easy way to trigger this is to add the spark-examples jar shipped with CDH5.2 in the classpath of your driver. That's one of the reasons I filed SPARK-4048, but I digress.) On Tue, Nov 18, 2014 at 1:59 PM, Anson Abraham anson.abra...@gmail.com wrote: I'm essentially loading a file and saving output to another location: val source = sc.textFile(/tmp/testfile.txt) source.saveAsTextFile(/tmp/testsparkoutput) when i do so, i'm hitting this error: 14/11/18 21:15:08 INFO DAGScheduler: Failed to run saveAsTextFile at console:15 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, cloudera-1.testdomain.net): java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode( ObjectInputStream.java:2421) java.io.ObjectInputStream.readObject0(ObjectInputStream. java:1382) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream. java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream. readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance. deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ scheduler$DAGScheduler$$failJobAndIndependentStages( DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage( DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$ handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$ handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$ $anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker( ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( ForkJoinWorkerThread.java:107) Cant figure out what the issue is. I'm running in CDH5.2 w/ version of spark being 1.1. The file i'm loading is literally just 7 MB. I thought it was jar files mismatch, but i did a compare and see they're all identical. But seeing as how they were all installed through CDH parcels, not sure how there would be version mismatch on the nodes and master. Oh yeah 1 master node w/ 2 worker nodes and running in standalone not through yarn. So as a just in case, i copied the jars from the master to the 2 worker nodes as just in case, and still same issue. Weird thing is, first time i installed and tested it out, it worked, but now it doesn't. Any help here would be
Re: Spark on YARN
I think your config may be the issue then. It sounds like 1 server is configured in a different YARN group that states it has way less resource than it does. On Wed, Nov 19, 2014 at 5:27 PM, Alan Prando a...@scanboo.com.br wrote: Hi all! Thanks for answering! @Sean, I tried to run with 30 executor-cores , and 1 machine still without processing. @Vanzin, I checked RM's web UI, and all nodes were detecteds and RUNNING. The interesting fact is that available memory and available core of 1 node was different of other 2, with just 1 available core and 1 available gig ram. @All, I created a new cluster with 10 slaves and 1 master, and now 9 of my slaves are working, and 1 still without processing. It's fine by me! I'm just wondering why YARN's doing it... Does anyone know the answer? 2014-11-18 16:18 GMT-02:00 Sean Owen so...@cloudera.com: My guess is you're asking for all cores of all machines but the driver needs at least one core, so one executor is unable to find a machine to fit on. On Nov 18, 2014 7:04 PM, Alan Prando a...@scanboo.com.br wrote: Hi Folks! I'm running Spark on YARN cluster installed with Cloudera Manager Express. The cluster has 1 master and 3 slaves, each machine with 32 cores and 64G RAM. My spark's job is working fine, however it seems that just 2 of 3 slaves are working (htop shows 2 slaves working 100% on 32 cores, and 1 slaves without any processing). I'm using this command: ./spark-submit --master yarn --num-executors 3 --executor-cores 32 --executor-memory 32g feature_extractor.py -r 390 Additionaly, spark's log testify communications with 2 slaves only: 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-180.ec2.internal:33177/user/Executor#-113177469] with ID 1 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-180.ec2.internal to /default 14/11/18 17:19:38 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-13-179.ec2.internal:51859/user/Executor#-323896724] with ID 2 14/11/18 17:19:38 INFO RackResolver: Resolved ip-172-31-13-179.ec2.internal to /default 14/11/18 17:19:38 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-180.ec2.internal:50959 with 16.6 GB RAM 14/11/18 17:19:39 INFO BlockManagerMasterActor: Registering block manager ip-172-31-13-179.ec2.internal:53557 with 16.6 GB RAM 14/11/18 17:19:51 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 3(ms) Is there a configuration to call spark's job on YARN cluster with all slaves? Thanks in advance! =] --- Regards Alan Vidotti Prando. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-shell giving me error of unread block data
On Wed, Nov 19, 2014 at 2:13 PM, Anson Abraham anson.abra...@gmail.com wrote: yeah but in this case i'm not building any files. just deployed out config files in CDH5.2 and initiated a spark-shell to just read and output a file. In that case it is a little bit weird. Just to be sure, you are using CDH's version of Spark, not trying to run an Apache Spark release on top of CDH, right? (If that's the case, then we could probably move this conversation to cdh-us...@cloudera.org, since it would be CDH-specific.) On Wed Nov 19 2014 at 4:52:51 PM Marcelo Vanzin van...@cloudera.com wrote: Hi Anson, We've seen this error when incompatible classes are used in the driver and executors (e.g., same class name, but the classes are different and thus the serialized data is different). This can happen for example if you're including some 3rd party libraries in your app's jar, or changing the driver/executor class paths to include these conflicting libraries. Can you clarify whether any of the above apply to your case? (For example, one easy way to trigger this is to add the spark-examples jar shipped with CDH5.2 in the classpath of your driver. That's one of the reasons I filed SPARK-4048, but I digress.) On Tue, Nov 18, 2014 at 1:59 PM, Anson Abraham anson.abra...@gmail.com wrote: I'm essentially loading a file and saving output to another location: val source = sc.textFile(/tmp/testfile.txt) source.saveAsTextFile(/tmp/testsparkoutput) when i do so, i'm hitting this error: 14/11/18 21:15:08 INFO DAGScheduler: Failed to run saveAsTextFile at console:15 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, cloudera-1.testdomain.net): java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Cant figure out what the issue is. I'm running in CDH5.2 w/ version of spark being 1.1. The file i'm loading is literally just 7 MB. I thought it was jar files mismatch, but i did a compare and see they're all identical. But seeing as how they were all installed through CDH parcels, not sure how there would be version mismatch
Re: Reading nested JSON data with Spark SQL
This works great, thank you! Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini On Wed, Nov 19, 2014 at 3:40 PM, Michael Armbrust mich...@databricks.com wrote: You can extract the nested fields in sql: SELECT field.nestedField ... If you don't do that then nested fields are represented as rows within rows and can be retrieved as follows: t.getAs[Row](0).getInt(0) Also, I would write t.getAs[Buffer[CharSequence]](12) as t.getAs[Seq[String]](12) since we don't guarantee the return type will be a buffer. On Wed, Nov 19, 2014 at 1:33 PM, Simone Franzini captainfr...@gmail.com wrote: I have been using Spark SQL to read in JSON data, like so: val myJsonFile = sqc.jsonFile(args(myLocation)) myJsonFile.registerTempTable(myTable) sqc.sql(mySQLQuery).map { row = myFunction(row) } And then in myFunction(row) I can read the various columns with the Row.getX methods. However, this methods only work for basic types (string, int, ...). I was having some trouble reading columns that are arrays or maps (i.e. other JSON objects). I am now using Spark 1.2 from the Cloudera snapshot and I noticed that there is a new method getAs. I was able to use it to read for example an array of strings like so: t.getAs[Buffer[CharSequence]](12) However, if I try to read a column with a nested JSON object like this: t.getAs[Map[String, Any]](11) I get the following error: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to scala.collection.immutable.Map How can I read such a field? Am I just missing something small or should I be looking for a completely different alternative to reading JSON? Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini
Re: spark-shell giving me error of unread block data
yeah CDH distribution (1.1). On Wed Nov 19 2014 at 5:29:39 PM Marcelo Vanzin van...@cloudera.com wrote: On Wed, Nov 19, 2014 at 2:13 PM, Anson Abraham anson.abra...@gmail.com wrote: yeah but in this case i'm not building any files. just deployed out config files in CDH5.2 and initiated a spark-shell to just read and output a file. In that case it is a little bit weird. Just to be sure, you are using CDH's version of Spark, not trying to run an Apache Spark release on top of CDH, right? (If that's the case, then we could probably move this conversation to cdh-us...@cloudera.org, since it would be CDH-specific.) On Wed Nov 19 2014 at 4:52:51 PM Marcelo Vanzin van...@cloudera.com wrote: Hi Anson, We've seen this error when incompatible classes are used in the driver and executors (e.g., same class name, but the classes are different and thus the serialized data is different). This can happen for example if you're including some 3rd party libraries in your app's jar, or changing the driver/executor class paths to include these conflicting libraries. Can you clarify whether any of the above apply to your case? (For example, one easy way to trigger this is to add the spark-examples jar shipped with CDH5.2 in the classpath of your driver. That's one of the reasons I filed SPARK-4048, but I digress.) On Tue, Nov 18, 2014 at 1:59 PM, Anson Abraham anson.abra...@gmail.com wrote: I'm essentially loading a file and saving output to another location: val source = sc.textFile(/tmp/testfile.txt) source.saveAsTextFile(/tmp/testsparkoutput) when i do so, i'm hitting this error: 14/11/18 21:15:08 INFO DAGScheduler: Failed to run saveAsTextFile at console:15 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, cloudera-1.testdomain.net): java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode( ObjectInputStream.java:2421) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) java.io.ObjectInputStream.defaultReadFields( ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream. java:370) org.apache.spark.serializer.JavaDeserializationStream. readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance. deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ scheduler$DAGScheduler$$failJobAndIndependentStages( DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage( DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$ handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$ handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$ $anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec( ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker( ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( ForkJoinWorkerThread.java:107) Cant figure out what the issue is. I'm running in CDH5.2 w/
Spark Standalone Scheduling
Hi, I am running some Spark code on my cluster in standalone mode. However, I have noticed that the most powerful machines (32 cores, 192 Gb mem) hardly get any tasks, whereas my small machines (8 cores, 128 Gb mem) all get plenty of tasks. The resources are all displayed correctly in the WebUI and machines all have the same configuration. When 'slaves' is to only contain the powerful machines they work well, though. However, I would like to make use of 'all' machines. Any idea what could be the reason? Or how the scheduler decides on which machine the task is assigned to? Would appreciate some help, Tassilo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Standalone-Scheduling-tp19323.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NEW to spark and sparksql
I would use just textFile unless you actually need a guarantee that you will be seeing a whole file at time (textFile splits on new lines). RDDs are immutable, so you cannot add data to them. You can however union two RDDs, returning a new RDD that contains all the data. On Wed, Nov 19, 2014 at 2:46 PM, Sam Flint sam.fl...@magnetic.com wrote: Michael, Thanks for your help. I found a wholeTextFiles() that I can use to import all files in a directory. I believe this would be the case if all the files existed in the same directory. Currently the files come in by the hour and are in a format somewhat like this ../2014/10/01/00/filename and there is one file per hour. Do I create an RDD and add to it? Is that possible? My example query would be select count(*) from (entire day RDD) where a=2. a would exist in all files multiple times with multiple values. I don't see in any documentation how to import a file create an RDD then import another file into that RDD. kinda like in mysql when you create a table import data then import more data. This may be my ignorance because I am not that familiar with spark, but I would expect to import data into a single RDD before performing analytics on it. Thank you for your time and help on this. P.S. I am using python if that makes a difference. On Wed, Nov 19, 2014 at 4:45 PM, Michael Armbrust mich...@databricks.com wrote: In general you should be able to read full directories of files as a single RDD/SchemaRDD. For documentation I'd suggest the programming guides: http://spark.apache.org/docs/latest/quick-start.html http://spark.apache.org/docs/latest/sql-programming-guide.html For Avro in particular, I have been working on a library for Spark SQL. Its very early code, but you can find it here: https://github.com/databricks/spark-avro Bug reports welcome! Michael On Wed, Nov 19, 2014 at 1:02 PM, Sam Flint sam.fl...@magnetic.com wrote: Hi, I am new to spark. I have began to read to understand sparks RDD files as well as SparkSQL. My question is more on how to build out the RDD files and best practices. I have data that is broken down by hour into files on HDFS in avro format. Do I need to create a separate RDD for each file? or using SparkSQL a separate SchemaRDD? I want to be able to pull lets say an entire day of data into spark and run some analytics on it. Then possibly a week, a month, etc. If there is documentation on this procedure or best practives for building RDD's please point me at them. Thanks for your time, Sam -- *MAGNE**+**I**C* *Sam Flint* | *Lead Developer, Data Analytics*
Re: spark-shell giving me error of unread block data
Sorry meant cdh 5.2 w/ spark 1.1. On Wed, Nov 19, 2014, 17:41 Anson Abraham anson.abra...@gmail.com wrote: yeah CDH distribution (1.1). On Wed Nov 19 2014 at 5:29:39 PM Marcelo Vanzin van...@cloudera.com wrote: On Wed, Nov 19, 2014 at 2:13 PM, Anson Abraham anson.abra...@gmail.com wrote: yeah but in this case i'm not building any files. just deployed out config files in CDH5.2 and initiated a spark-shell to just read and output a file. In that case it is a little bit weird. Just to be sure, you are using CDH's version of Spark, not trying to run an Apache Spark release on top of CDH, right? (If that's the case, then we could probably move this conversation to cdh-us...@cloudera.org, since it would be CDH-specific.) On Wed Nov 19 2014 at 4:52:51 PM Marcelo Vanzin van...@cloudera.com wrote: Hi Anson, We've seen this error when incompatible classes are used in the driver and executors (e.g., same class name, but the classes are different and thus the serialized data is different). This can happen for example if you're including some 3rd party libraries in your app's jar, or changing the driver/executor class paths to include these conflicting libraries. Can you clarify whether any of the above apply to your case? (For example, one easy way to trigger this is to add the spark-examples jar shipped with CDH5.2 in the classpath of your driver. That's one of the reasons I filed SPARK-4048, but I digress.) On Tue, Nov 18, 2014 at 1:59 PM, Anson Abraham anson.abra...@gmail.com wrote: I'm essentially loading a file and saving output to another location: val source = sc.textFile(/tmp/testfile.txt) source.saveAsTextFile(/tmp/testsparkoutput) when i do so, i'm hitting this error: 14/11/18 21:15:08 INFO DAGScheduler: Failed to run saveAsTextFile at console:15 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, cloudera-1.testdomain.net): java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode( ObjectInputStream.java:2421) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea m.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream. java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre am.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java :370) org.apache.spark.serializer.JavaDeserializationStream.readOb ject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deseriali ze(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor. scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$sch eduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(Resiza bleArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer. scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGSchedu ler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS etFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS etFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$ anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask. java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask( ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo l.java:1979) at
PairRDDFunctions with Tuple2 subclasses
I have a class which is a subclass of Tuple2, and I want to use it with PairRDDFunctions. However, I seem to be limited by the invariance of T in RDD[T] (see SPARK-1296 https://issues.apache.org/jira/browse/SPARK-1296). My Scala-fu is weak: the only way I could think to make this work would be to define my own equivalent of PairRDDFunctions which works with my class, does type conversions to Tuple2, and delegates to PairRDDFunctions. Does anyone know a better way? Anyone know if there will be a significant performance penalty with that approach? -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: PairRDDFunctions with Tuple2 subclasses
I think you should also be able to get away with casting it back and forth in this case using .asInstanceOf. On Wed, Nov 19, 2014 at 4:39 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: I have a class which is a subclass of Tuple2, and I want to use it with PairRDDFunctions. However, I seem to be limited by the invariance of T in RDD[T] (see SPARK-1296 https://issues.apache.org/jira/browse/SPARK-1296 ). My Scala-fu is weak: the only way I could think to make this work would be to define my own equivalent of PairRDDFunctions which works with my class, does type conversions to Tuple2, and delegates to PairRDDFunctions. Does anyone know a better way? Anyone know if there will be a significant performance penalty with that approach? -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: spark-shell giving me error of unread block data
As Marcelo mentioned, the issue occurs mostly when incompatible classes are used by executors or drivers. Try out if the output is coming on spark-shell. If yes, then most probably in your case, there might be some issue with your configuration files. It will be helpful if you can paste the contents of the config files you edited. On Thu, Nov 20, 2014 at 5:45 AM, Anson Abraham anson.abra...@gmail.com wrote: Sorry meant cdh 5.2 w/ spark 1.1. On Wed, Nov 19, 2014, 17:41 Anson Abraham anson.abra...@gmail.com wrote: yeah CDH distribution (1.1). On Wed Nov 19 2014 at 5:29:39 PM Marcelo Vanzin van...@cloudera.com wrote: On Wed, Nov 19, 2014 at 2:13 PM, Anson Abraham anson.abra...@gmail.com wrote: yeah but in this case i'm not building any files. just deployed out config files in CDH5.2 and initiated a spark-shell to just read and output a file. In that case it is a little bit weird. Just to be sure, you are using CDH's version of Spark, not trying to run an Apache Spark release on top of CDH, right? (If that's the case, then we could probably move this conversation to cdh-us...@cloudera.org, since it would be CDH-specific.) On Wed Nov 19 2014 at 4:52:51 PM Marcelo Vanzin van...@cloudera.com wrote: Hi Anson, We've seen this error when incompatible classes are used in the driver and executors (e.g., same class name, but the classes are different and thus the serialized data is different). This can happen for example if you're including some 3rd party libraries in your app's jar, or changing the driver/executor class paths to include these conflicting libraries. Can you clarify whether any of the above apply to your case? (For example, one easy way to trigger this is to add the spark-examples jar shipped with CDH5.2 in the classpath of your driver. That's one of the reasons I filed SPARK-4048, but I digress.) On Tue, Nov 18, 2014 at 1:59 PM, Anson Abraham anson.abra...@gmail.com wrote: I'm essentially loading a file and saving output to another location: val source = sc.textFile(/tmp/testfile.txt) source.saveAsTextFile(/tmp/testsparkoutput) when i do so, i'm hitting this error: 14/11/18 21:15:08 INFO DAGScheduler: Failed to run saveAsTextFile at console:15 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, cloudera-1.testdomain.net): java.lang.IllegalStateExceptio n: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode( ObjectInputStream.java:2421) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea m.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream. java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre am.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java :370) org.apache.spark.serializer.JavaDeserializationStream.readOb ject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deseriali ze(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor. scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$sch eduler$DAGScheduler$$failJobAndIndependentStages(DAGSchedule r.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$ 1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$ 1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(Resiza bleArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer. scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGSchedu ler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS etFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS etFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$ anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at
Joining DStream with static file
Here is my attempt: val sparkConf = new SparkConf().setAppName(LogCounter) val ssc = new StreamingContext(sparkConf, Seconds(2)) val sc = new SparkContext() val geoData = sc.textFile(data/geoRegion.csv) .map(_.split(',')) .map(line = (line(0), (line(1),line(2),line(3),line(4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details removed for brevity val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =rdd.join(geoData)) This is very wrong. I have a feeling I should be broadcasting geoData instead of reading it in with each task (it's a 100MB file), but I'm not sure where to put the code that maps from the .csv to the final geoData rdd. Also I'm not sure if geoData is even defined correctly (maybe it should use ssc instead of sc?). Please advise. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-DStream-with-static-file-tp19329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PairRDDFunctions with Tuple2 subclasses
Casting to Tuple2 is easy, but the output of reduceByKey is presumably a new Tuple2 instance so I'll need to map those to new instances of my class. Not sure how much overhead will be added by the creation of those new instances. If I do that everywhere in my code though, it will make the code really messy. That is why I was thinking of creating a wrapper which looks like PairRDDFunctions which would cast to a pair RDD, delegate to PairRDDFunctions, and then convert back to my class. I was kinda hoping a Scala wizard would come along with some black magic though. On Wed, Nov 19, 2014 at 7:45 PM, Michael Armbrust mich...@databricks.com wrote: I think you should also be able to get away with casting it back and forth in this case using .asInstanceOf. On Wed, Nov 19, 2014 at 4:39 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: I have a class which is a subclass of Tuple2, and I want to use it with PairRDDFunctions. However, I seem to be limited by the invariance of T in RDD[T] (see SPARK-1296 https://issues.apache.org/jira/browse/SPARK-1296). My Scala-fu is weak: the only way I could think to make this work would be to define my own equivalent of PairRDDFunctions which works with my class, does type conversions to Tuple2, and delegates to PairRDDFunctions . Does anyone know a better way? Anyone know if there will be a significant performance penalty with that approach? -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
re: How to incrementally compile spark examples using mvn
Hi Sean, Thank you for your reply. I was wondering whether there is a method of reusing locally-built components without installing them? That is, if I have successfully built the spark project as a whole, how should I configure it so that I can incrementally build (only) the spark-examples sub project without the need of downloading or installation? Thank you! Cheers, Yiming -邮件原件- 发件人: Sean Owen [mailto:so...@cloudera.com] 发送时间: 2014年11月17日 17:40 收件人: yiming zhang 抄送: Marcelo Vanzin; user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn The downloads just happen once so this is not a problem. If you are just building one module in a project, it needs a compiled copy of other modules. It will either use your locally-built and locally-installed artifact, or, download one from the repo if possible. This isn't needed if you are compiling all modules at once. If you want to compile everything and reuse the local artifacts later, you need 'install' not 'package'. On Mon, Nov 17, 2014 at 12:27 AM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you Marcelo. I tried your suggestion (# mvn -pl :spark-examples_2.10 compile), but it required to download many spark components (as listed below), which I have already compiled on my server. Downloading: https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.1.0/ spark-core_2.10-1.1.0.pom ... Downloading: https://repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.10/1 .1.0/spark-streaming_2.10-1.1.0.pom ... Downloading: https://repository.jboss.org/nexus/content/repositories/releases/org/a pache/spark/spark-hive_2.10/1.1.0/spark-hive_2.10-1.1.0.pom ... This problem didn't happen when I compiled the whole project using ``mvn -DskipTests package''. I guess some configurations have to be made to tell mvn the dependencies are local. Any idea for that? Thank you for your help! Cheers, Yiming -邮件原件- 发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 发送时间: 2014年11月16日 10:26 收件人: sdi...@gmail.com 抄送: user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn I haven't tried scala:cc, but you can ask maven to just build a particular sub-project. For example: mvn -pl :spark-examples_2.10 compile On Sat, Nov 15, 2014 at 5:31 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Hi, I have already successfully compile and run spark examples. My problem is that if I make some modifications (e.g., on SparkPi.scala or LogQuery.scala) I have to use “mvn -DskipTests package” to rebuild the whole spark project and wait a relatively long time. I also tried “mvn scala:cc” as described in http://spark.apache.org/docs/latest/building-with-maven.html, but I could only get infinite stop like: [INFO] --- scala-maven-plugin:3.2.0:cc (default-cli) @ spark-parent --- [INFO] wait for files to compile... Is there any method to incrementally compile the examples using mvn? Thank you! Cheers, Yiming -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cannot access data after a join (error: value _1 is not a member of Product with Serializable)
Hi, it looks what you are trying to use as a Tuple cannot be inferred to be a Tuple from the compiler. Try to add type declarations and maybe you will see where things fail. Tobias
How to view log on yarn-client mode?
Hi, How can I view log on yarn-client mode? When I insert the following line on mapToPair function for example, System.out.println(TEST TEST); On local mode, it is displayed on console. But on yarn-client mode, it is not on anywhere. When I use yarn resource manager web UI, the size of 'stdout' file is 0. And the size of 'stderr' file is non-zero, but it has only the following lines. Maybe it's from executor launcher, but not from executor process itself. (I'm using Spark 1.0.0) SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/grid/3/hadoop/yarn/local/filecache/10/spark-assembly-1.0.0-hadoop 2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/Static LoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/11/20 10:42:29 INFO YarnSparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/11/20 10:42:29 INFO SecurityManager: Changing view acls to: yarn,xcapvuze 14/11/20 10:42:29 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, xcapvuze) 14/11/20 10:42:29 INFO Slf4jLogger: Slf4jLogger started 14/11/20 10:42:29 INFO Remoting: Starting remoting 14/11/20 10:42:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkYarnAM@cluster04:37065] 14/11/20 10:42:29 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkYarnAM@cluster04:37065] 14/11/20 10:42:29 INFO RMProxy: Connecting to ResourceManager at cluster01/10.254.0.11:8030 14/11/20 10:42:29 INFO ExecutorLauncher: ApplicationAttemptId: appattempt_1416441180745_0003_01 14/11/20 10:42:29 INFO ExecutorLauncher: Registering the ApplicationMaster 14/11/20 10:42:29 INFO ExecutorLauncher: Waiting for Spark driver to be reachable. 14/11/20 10:42:29 INFO ExecutorLauncher: Driver now available: INNO-C-358:50050 14/11/20 10:42:29 INFO ExecutorLauncher: Listen to driver: akka.tcp://spark@INNO-C-358:50050/user/CoarseGrainedScheduler 14/11/20 10:42:29 INFO ExecutorLauncher: Allocating 3 executors. 14/11/20 10:42:29 INFO YarnAllocationHandler: Will Allocate 3 executor containers, each with 4480 memory 14/11/20 10:42:29 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:4480, vCores:4 14/11/20 10:42:29 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:4480, vCores:4 14/11/20 10:42:29 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:4480, vCores:4 14/11/20 10:42:30 INFO AMRMClientImpl: Received new token for : cluster03:45454 14/11/20 10:42:30 INFO AMRMClientImpl: Received new token for : cluster04:45454 14/11/20 10:42:30 INFO AMRMClientImpl: Received new token for : cluster02:45454 14/11/20 10:42:30 INFO RackResolver: Resolved cluster03 to /default-rack 14/11/20 10:42:30 INFO RackResolver: Resolved cluster02 to /default-rack 14/11/20 10:42:30 INFO RackResolver: Resolved cluster04 to /default-rack 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching container container_1416441180745_0003_01_02 for on host cluster03 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching ExecutorRunnable. driverUrl: akka.tcp://spark@INNO-C-358:50050/user/CoarseGrainedScheduler, executorHostname: cluster03 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching container container_1416441180745_0003_01_04 for on host cluster02 14/11/20 10:42:30 INFO ExecutorRunnable: Starting Executor Container 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching ExecutorRunnable. driverUrl: akka.tcp://spark@INNO-C-358:50050/user/CoarseGrainedScheduler, executorHostname: cluster02 14/11/20 10:42:30 INFO ExecutorRunnable: Starting Executor Container 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching container container_1416441180745_0003_01_03 for on host cluster04 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching ExecutorRunnable. driverUrl: akka.tcp://spark@INNO-C-358:50050/user/CoarseGrainedScheduler, executorHostname: cluster04 14/11/20 10:42:30 INFO ContainerManagementProtocolProxy: yarn.client.max-nodemanagers-proxies : 500 14/11/20 10:42:30 INFO ContainerManagementProtocolProxy: yarn.client.max-nodemanagers-proxies : 500 14/11/20 10:42:30 INFO ExecutorRunnable: Starting Executor Container 14/11/20 10:42:30 INFO ContainerManagementProtocolProxy: yarn.client.max-nodemanagers-proxies : 500 14/11/20 10:42:30 INFO ExecutorRunnable: Setting up ContainerLaunchContext 14/11/20 10:42:30 INFO
Re: Can we make EdgeRDD and VertexRDD storage level to MEMORY_AND_DISK?
Just figured it out using Graph constructor you can pass the storage level for both Edge and Vertex : Graph.fromEdges(edges, defaultValue = (,),StorageLevel.MEMORY_AND_DISK,StorageLevel.MEMORY_AND_DISK ) Thanks to this post : https://issues.apache.org/jira/browse/SPARK-1991 - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-we-make-EdgeRDD-and-VertexRDD-storage-level-to-MEMORY-AND-DISK-tp19307p19335.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
insertIntoTable failure deleted pre-existing _metadata file
Hello, I'm loading and saving json files into an existing directory with parquet files using the insertIntoTable method. If the method fails for some reason (differences in the schema in my case), the _metadata file of the parquet dir is automatically deleted, rendering the existing parquet files useless. Looks like a bug to me. Daniel
Re: How to apply schema to queried data from Hive before saving it as parquet file?
Thanks for replying .I was unable to figure out how after I use jsonFile/jsonRDD be able to load data into a hive table. Also I was able to save the SchemaRDD I got via hiveContext.sql(...).saveAsParquetFile(Path) ie. save schemardd as parquetfile but when I tried to fetch data from parquet file back like so(below) and save data back to a text file i Got some weird values like org.apache.spark.sql.api.java.Row@e26c01c7 in the text files generated as output :-- JavaSchemaRDD parquetfilerdd=sqlContext.parquetFile(path/to/parquet/File); parquetfilerdd.registerTempTable(pq); JavaSchemaRDD writetxt=sqlCtx.sql(Select * from pq); writetxt.saveAsTextFile(Path/To/Text/Files); // This step created text files which was filled with values likeorg.apache.spark.sql.api.java.Row@e26c01c7 I know there must be something which could do it right, just that I haven't been able to figure out all the while. Could you please help . -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-apply-schema-to-queried-data-from-Hive-before-saving-it-as-parquet-file-tp19259p19338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming not working in YARN mode
I created a simple Spark Streaming program - it received numbers and computed averages and sent the results to Kafka. It worked perfectly in local mode as well as standalone master/slave mode across a two-node cluster. It did not work however in yarn-client or yarn-cluster mode. The job was accepted and running on a node but did not produce any outputs... Any suggestions? Thanks! cloud
Transform RDD.groupBY result to multiple RDDs
Hi, all Suppose I have a RDD of (K, V) tuple and I do groupBy on it with the key K. My question is how to make each groupBy resukt whick is (K, iterable[V]) a RDD. BTW, can we transform it as a DStream and also each groupBY result is a RDD in it? Best Regards, Kevin.
Naive Baye's classification confidence
I have been trying the Naive Baye's implementation of Spark's MLlib.During testing phase, I wish to eliminate data with low confidence of prediction. My data set primarily consists of form based documents like reports and application forms. They contain key-value pair type text and hence I assume the independence condition holds better than with natural language. About the quality of priors, I am not doing anything special. I am training more or less equal number of samples for each class and have left the heavy lifting to be done by MLlib. Given these facts, does it make sense to have confidence thresholds defined for each category above which I will get correct results consistently? Thanks Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Naive-Baye-s-classification-confidence-tp19341.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to view log on yarn-client mode?
While the app is running, you can find logs from the YARN web UI by navigating to containers through the Nodes link. After the app has completed, you can use the YARN logs command: yarn logs -applicationId your app ID -Sandy On Wed, Nov 19, 2014 at 6:01 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Hi, How can I view log on yarn-client mode? When I insert the following line on mapToPair function for example, System.out.println(TEST TEST); On local mode, it is displayed on console. But on yarn-client mode, it is not on anywhere. When I use yarn resource manager web UI, the size of ‘stdout’ file is 0. And the size of ‘stderr’ file is non-zero, but it has only the following lines. Maybe it’s from executor launcher, but not from executor process itself. (I’m using Spark 1.0.0) SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/grid/3/hadoop/yarn/local/filecache/10/spark-assembly-1.0.0-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/11/20 10:42:29 INFO YarnSparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/11/20 10:42:29 INFO SecurityManager: Changing view acls to: yarn,xcapvuze 14/11/20 10:42:29 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, xcapvuze) 14/11/20 10:42:29 INFO Slf4jLogger: Slf4jLogger started 14/11/20 10:42:29 INFO Remoting: Starting remoting 14/11/20 10:42:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkYarnAM@cluster04:37065] 14/11/20 10:42:29 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkYarnAM@cluster04:37065] 14/11/20 10:42:29 INFO RMProxy: Connecting to ResourceManager at cluster01/ 10.254.0.11:8030 14/11/20 10:42:29 INFO ExecutorLauncher: ApplicationAttemptId: appattempt_1416441180745_0003_01 14/11/20 10:42:29 INFO ExecutorLauncher: Registering the ApplicationMaster 14/11/20 10:42:29 INFO ExecutorLauncher: Waiting for Spark driver to be reachable. 14/11/20 10:42:29 INFO ExecutorLauncher: Driver now available: INNO-C-358:50050 14/11/20 10:42:29 INFO ExecutorLauncher: Listen to driver: akka.tcp://spark@INNO-C-358:50050/user/CoarseGrainedScheduler 14/11/20 10:42:29 INFO ExecutorLauncher: Allocating 3 executors. 14/11/20 10:42:29 INFO YarnAllocationHandler: Will Allocate 3 executor containers, each with 4480 memory 14/11/20 10:42:29 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:4480, vCores:4 14/11/20 10:42:29 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:4480, vCores:4 14/11/20 10:42:29 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:4480, vCores:4 14/11/20 10:42:30 INFO AMRMClientImpl: Received new token for : cluster03:45454 14/11/20 10:42:30 INFO AMRMClientImpl: Received new token for : cluster04:45454 14/11/20 10:42:30 INFO AMRMClientImpl: Received new token for : cluster02:45454 14/11/20 10:42:30 INFO RackResolver: Resolved cluster03 to /default-rack 14/11/20 10:42:30 INFO RackResolver: Resolved cluster02 to /default-rack 14/11/20 10:42:30 INFO RackResolver: Resolved cluster04 to /default-rack 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching container container_1416441180745_0003_01_02 for on host cluster03 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching ExecutorRunnable. driverUrl: akka.tcp://spark@INNO-C-358:50050/user/CoarseGrainedScheduler, executorHostname: cluster03 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching container container_1416441180745_0003_01_04 for on host cluster02 14/11/20 10:42:30 INFO ExecutorRunnable: Starting Executor Container 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching ExecutorRunnable. driverUrl: akka.tcp://spark@INNO-C-358:50050/user/CoarseGrainedScheduler, executorHostname: cluster02 14/11/20 10:42:30 INFO ExecutorRunnable: Starting Executor Container 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching container container_1416441180745_0003_01_03 for on host cluster04 14/11/20 10:42:30 INFO YarnAllocationHandler: Launching ExecutorRunnable. driverUrl: akka.tcp://spark@INNO-C-358:50050/user/CoarseGrainedScheduler, executorHostname: cluster04 14/11/20 10:42:30 INFO ContainerManagementProtocolProxy:
Re: How to apply schema to queried data from Hive before saving it as parquet file?
Sorry about the confusion I created . I just have started learning this week. Silly me, I was actually writing the schema to a txt file and expecting records. This is what I was supposed to do. Also if you could let me know about adding the data from jsonFile/jsonRDD methods of hiveContext to hive tables it will be appreciated. JavaRDDString result=writetxt.map(new FunctionRow, String() { public String call(Row row) { String temp=; temp+=(row.getInt(0))+ ; temp+=row.getString(1)+ ; temp+=(row.getInt(2)); return temp; } }); result.saveAsTextFile(pqtotxt); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-apply-schema-to-queried-data-from-Hive-before-saving-it-as-parquet-file-tp19259p19343.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to apply schema to queried data from Hive before saving it as parquet file?
You can save the results as parquet file or as text file and created a hive table based on these files Daniel On 20 בנוב׳ 2014, at 08:01, akshayhazari akshayhaz...@gmail.com wrote: Sorry about the confusion I created . I just have started learning this week. Silly me, I was actually writing the schema to a txt file and expecting records. This is what I was supposed to do. Also if you could let me know about adding the data from jsonFile/jsonRDD methods of hiveContext to hive tables it will be appreciated. JavaRDDString result=writetxt.map(new FunctionRow, String() { public String call(Row row) { String temp=; temp+=(row.getInt(0))+ ; temp+=row.getString(1)+ ; temp+=(row.getInt(2)); return temp; } }); result.saveAsTextFile(pqtotxt); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-apply-schema-to-queried-data-from-Hive-before-saving-it-as-parquet-file-tp19259p19343.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming not working in YARN mode
Make sure the executor cores are set to a value which is = 2 while submitting the job. Thanks Best Regards On Thu, Nov 20, 2014 at 10:36 AM, kam lee cloudher...@gmail.com wrote: I created a simple Spark Streaming program - it received numbers and computed averages and sent the results to Kafka. It worked perfectly in local mode as well as standalone master/slave mode across a two-node cluster. It did not work however in yarn-client or yarn-cluster mode. The job was accepted and running on a node but did not produce any outputs... Any suggestions? Thanks! cloud
Re: Transform RDD.groupBY result to multiple RDDs
What's your use case? You would not generally want to make so many small RDDs. On Nov 20, 2014 6:19 AM, Dai, Kevin yun...@ebay.com wrote: Hi, all Suppose I have a RDD of (K, V) tuple and I do groupBy on it with the key K. My question is how to make each groupBy resukt whick is (K, iterable[V]) a RDD. BTW, can we transform it as a DStream and also each groupBY result is a RDD in it? Best Regards, Kevin.
Re: Spark Streaming with Flume or Kafka?
You can also look at the Amazon's kinesis if you don't want to handle the pain of maintaining kafka/flume infra. Thanks Best Regards On Thu, Nov 20, 2014 at 3:32 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Thank you for your answer, I don't know if I typed the question correctly. But your nswer helps me. I'm going to make the question again for knowing if you understood me. I have this topology: DataSource1, , DataSourceN -- Kafka -- SparkStreaming -- HDFS Kafka -- HDFS (raw data) DataSource1, , DataSourceN -- Flume -- SparkStreaming -- HDFS Flume -- HDFS (raw data) All data are going to be processed and going to HDFS as raw and processed data. I don't know if it makes sense to use Kafka in this case if data are just going to HDFS. I guess that before this FlumeSpark Sink has more sense to feed SparkStream with a real-time flow of data.. It doesn't look too much sense to have SparkStreaming and get the data from HDFS. 2014-11-19 22:55 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com: Thank you for your answer, I don't know if I typed the question correctly. But your nswer helps me. I'm going to make the question again for knowing if you understood me. I have this topology: DataSource1, , DataSourceN -- Kafka -- SparkStreaming -- HDFS DataSource1, , DataSourceN -- Flume -- SparkStreaming -- HDFS All data are going to be pro 2014-11-19 21:50 GMT+01:00 Hari Shreedharan hshreedha...@cloudera.com: Btw, if you want to write to Spark Streaming from Flume -- there is a sink (it is a part of Spark, not Flume). See Approach 2 here: http://spark.apache.org/docs/latest/streaming-flume-integration.html On Wed, Nov 19, 2014 at 12:41 PM, Hari Shreedharan hshreedha...@cloudera.com wrote: As of now, you can feed Spark Streaming from both kafka and flume. Currently though there is no API to write data back to either of the two directly. I sent a PR which should eventually add something like this: https://github.com/harishreedharan/spark/blob/Kafka-output/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala that would allow Spark Streaming to write back to Kafka. This will likely be reviewed and committed after 1.2. I would consider writing something similar to push data to Flume as well, if there is a sufficient use-case for it. I have seen people talk about writing back to kafka quite a bit - hence the above patch. Which one is better is upto your use-case and existing infrastructure and preference. Both would work as is, but writing back to Flume would usually be if you want to write to HDFS/HBase/Solr etc -- which you could write back directly from Spark Streaming (of course, there are benefits of writing back using Flume like the additional buffering etc Flume gives), but it is still possible to do so from Spark Streaming itself. But for Kafka, the usual use-case is a variety of custom applications reading the same data -- for which it makes a whole lot of sense to write back to Kafka. An example is to sanitize incoming data in Spark Streaming (from Flume or Kafka or something else) and make it available for a variety of apps via Kafka. Hope this helps! Hari On Wed, Nov 19, 2014 at 8:10 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm starting with Spark and I just trying to understand if I want to use Spark Streaming, should I use to feed it Flume or Kafka? I think there's not a official Sink for Flume to Spark Streaming and it seems that Kafka it fits better since gives you readibility. Could someone give a good scenario for each alternative? When would it make sense to use Kafka and when Flume for Spark Streaming? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Joining DStream with static file
1. You don't have to create another sparkContext. you can simply call the *ssc.sparkContext* 2. May be after the transformation on geoData, you could do a persist so next time, it will be read from memory. Thanks Best Regards On Thu, Nov 20, 2014 at 6:43 AM, YaoPau jonrgr...@gmail.com wrote: Here is my attempt: val sparkConf = new SparkConf().setAppName(LogCounter) val ssc = new StreamingContext(sparkConf, Seconds(2)) val sc = new SparkContext() val geoData = sc.textFile(data/geoRegion.csv) .map(_.split(',')) .map(line = (line(0), (line(1),line(2),line(3),line(4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details removed for brevity val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =rdd.join(geoData)) This is very wrong. I have a feeling I should be broadcasting geoData instead of reading it in with each task (it's a 100MB file), but I'm not sure where to put the code that maps from the .csv to the final geoData rdd. Also I'm not sure if geoData is even defined correctly (maybe it should use ssc instead of sc?). Please advise. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-DStream-with-static-file-tp19329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming with Flume or Kafka?
Thank you, but I'm just considering a free options. 2014-11-20 7:53 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: You can also look at the Amazon's kinesis if you don't want to handle the pain of maintaining kafka/flume infra. Thanks Best Regards On Thu, Nov 20, 2014 at 3:32 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Thank you for your answer, I don't know if I typed the question correctly. But your nswer helps me. I'm going to make the question again for knowing if you understood me. I have this topology: DataSource1, , DataSourceN -- Kafka -- SparkStreaming -- HDFS Kafka -- HDFS (raw data) DataSource1, , DataSourceN -- Flume -- SparkStreaming -- HDFS Flume -- HDFS (raw data) All data are going to be processed and going to HDFS as raw and processed data. I don't know if it makes sense to use Kafka in this case if data are just going to HDFS. I guess that before this FlumeSpark Sink has more sense to feed SparkStream with a real-time flow of data.. It doesn't look too much sense to have SparkStreaming and get the data from HDFS. 2014-11-19 22:55 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com: Thank you for your answer, I don't know if I typed the question correctly. But your nswer helps me. I'm going to make the question again for knowing if you understood me. I have this topology: DataSource1, , DataSourceN -- Kafka -- SparkStreaming -- HDFS DataSource1, , DataSourceN -- Flume -- SparkStreaming -- HDFS All data are going to be pro 2014-11-19 21:50 GMT+01:00 Hari Shreedharan hshreedha...@cloudera.com: Btw, if you want to write to Spark Streaming from Flume -- there is a sink (it is a part of Spark, not Flume). See Approach 2 here: http://spark.apache.org/docs/latest/streaming-flume-integration.html On Wed, Nov 19, 2014 at 12:41 PM, Hari Shreedharan hshreedha...@cloudera.com wrote: As of now, you can feed Spark Streaming from both kafka and flume. Currently though there is no API to write data back to either of the two directly. I sent a PR which should eventually add something like this: https://github.com/harishreedharan/spark/blob/Kafka-output/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaOutputWriter.scala that would allow Spark Streaming to write back to Kafka. This will likely be reviewed and committed after 1.2. I would consider writing something similar to push data to Flume as well, if there is a sufficient use-case for it. I have seen people talk about writing back to kafka quite a bit - hence the above patch. Which one is better is upto your use-case and existing infrastructure and preference. Both would work as is, but writing back to Flume would usually be if you want to write to HDFS/HBase/Solr etc -- which you could write back directly from Spark Streaming (of course, there are benefits of writing back using Flume like the additional buffering etc Flume gives), but it is still possible to do so from Spark Streaming itself. But for Kafka, the usual use-case is a variety of custom applications reading the same data -- for which it makes a whole lot of sense to write back to Kafka. An example is to sanitize incoming data in Spark Streaming (from Flume or Kafka or something else) and make it available for a variety of apps via Kafka. Hope this helps! Hari On Wed, Nov 19, 2014 at 8:10 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm starting with Spark and I just trying to understand if I want to use Spark Streaming, should I use to feed it Flume or Kafka? I think there's not a official Sink for Flume to Spark Streaming and it seems that Kafka it fits better since gives you readibility. Could someone give a good scenario for each alternative? When would it make sense to use Kafka and when Flume for Spark Streaming? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
re: How to incrementally compile spark examples using mvn
Why not install them? It doesn't take any work and is the only correct way to do it. mvn install is all you need. On Nov 20, 2014 2:35 AM, Yiming (John) Zhang sdi...@gmail.com wrote: Hi Sean, Thank you for your reply. I was wondering whether there is a method of reusing locally-built components without installing them? That is, if I have successfully built the spark project as a whole, how should I configure it so that I can incrementally build (only) the spark-examples sub project without the need of downloading or installation? Thank you! Cheers, Yiming -邮件原件- 发件人: Sean Owen [mailto:so...@cloudera.com] 发送时间: 2014年11月17日 17:40 收件人: yiming zhang 抄送: Marcelo Vanzin; user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn The downloads just happen once so this is not a problem. If you are just building one module in a project, it needs a compiled copy of other modules. It will either use your locally-built and locally-installed artifact, or, download one from the repo if possible. This isn't needed if you are compiling all modules at once. If you want to compile everything and reuse the local artifacts later, you need 'install' not 'package'. On Mon, Nov 17, 2014 at 12:27 AM, Yiming (John) Zhang sdi...@gmail.com wrote: Thank you Marcelo. I tried your suggestion (# mvn -pl :spark-examples_2.10 compile), but it required to download many spark components (as listed below), which I have already compiled on my server. Downloading: https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.1.0/ spark-core_2.10-1.1.0.pom ... Downloading: https://repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.10/1 .1.0/spark-streaming_2.10-1.1.0.pom ... Downloading: https://repository.jboss.org/nexus/content/repositories/releases/org/a pache/spark/spark-hive_2.10/1.1.0/spark-hive_2.10-1.1.0.pom ... This problem didn't happen when I compiled the whole project using ``mvn -DskipTests package''. I guess some configurations have to be made to tell mvn the dependencies are local. Any idea for that? Thank you for your help! Cheers, Yiming -邮件原件- 发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 发送时间: 2014年11月16日 10:26 收件人: sdi...@gmail.com 抄送: user@spark.apache.org 主题: Re: How to incrementally compile spark examples using mvn I haven't tried scala:cc, but you can ask maven to just build a particular sub-project. For example: mvn -pl :spark-examples_2.10 compile On Sat, Nov 15, 2014 at 5:31 PM, Yiming (John) Zhang sdi...@gmail.com wrote: Hi, I have already successfully compile and run spark examples. My problem is that if I make some modifications (e.g., on SparkPi.scala or LogQuery.scala) I have to use “mvn -DskipTests package” to rebuild the whole spark project and wait a relatively long time. I also tried “mvn scala:cc” as described in http://spark.apache.org/docs/latest/building-with-maven.html, but I could only get infinite stop like: [INFO] --- scala-maven-plugin:3.2.0:cc (default-cli) @ spark-parent --- [INFO] wait for files to compile... Is there any method to incrementally compile the examples using mvn? Thank you! Cheers, Yiming -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org