Repeated data item search with Spark SQL(1.0.1)
Hi All: I am using Spark SQL 1.0.1 for a simple test, the loaded data (JSON format) which is registered as table people is: {name:Michael, schools:[{name:ABC,time:1994},{name:EFG,time:2000}]} {name:Andy, age:30,scores:{eng:98,phy:89}} {name:Justin, age:19} the schools has repeated value {name:XXX,time:X}, how should I write the SQL to select the people who has schools with name ABC? I have tried SELECT name FROM people WHERE schools.name = 'ABC' ,but seems wrong with: [error] (run-main-0) org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'name, tree: [error] Project ['name] [error] Filter ('schools.name = ABC) [error] Subquery people [error]ParquetRelation people.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'name, tree: Project ['name] Filter ('schools.name = ABC) Subquery people ParquetRelation people.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:71) ... Could anybody show me how to write a right SQL for the repeated data item search in Spark SQL? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Repeated-data-item-search-with-Spark-SQL-1-0-1-tp9544.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
can't print DStream after reduce
Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: Nested Query With Spark SQL(1.0.1)
Or is it supported? I know I could doing it myself with filter, but if SQL could support, would be much better, thx! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Nested-Query-With-Spark-SQL-1-0-1-tp9544p9547.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Error in JavaKafkaWordCount.java example
Hello, I am referring following example: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I am getting following C*ompilation Error* : \example\JavaKafkaWordCount.java:[62,70] error: cannot access ClassTag Please help me. Thanks in advance. -- *Regards,* *Mahebub Sayyed*
Problem reading in LZO compressed files
Hello, I have been trying to play with the Google ngram dataset provided by Amazon in form of LZO compressed files. I am having trouble understanding what is going on ;). I have added the compression jar and native library to the underlying Hadoop/HDFS installation, restarted the name node and the datanodes, Spark can obviously see the file but I get gibberish on a read. Any ideas? See output below: 14/07/13 14:39:19 INFO SparkContext: Added JAR file:/home/ec2-user/hadoop/lib/hadoop-gpl-compression-0.1.0.jar at http://10.10.0.100:40100/jars/hadoop-gpl-compression-0.1.0.jar with timestamp 1405262359777 14/07/13 14:39:20 INFO SparkILoop: Created spark context.. Spark context available as sc. scala val f = sc.textFile(hdfs://10.10.0.98:54310/data/1gram.lzo) 14/07/13 14:39:34 INFO MemoryStore: ensureFreeSpace(163793) called with curMem=0, maxMem=311387750 14/07/13 14:39:34 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 160.0 KB, free 296.8 MB) f: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala f.take(10) 14/07/13 14:39:43 INFO SparkContext: Job finished: take at console:15, took 0.419708348 s res0: Array[String] = Array(SEQ?!org.apache.hadoop.io.LongWritable?org.apache.hadoop.io.Text??#com.hadoop.compression.lzo.LzoCodec���\N�#^�??d^�k���\N�#^�??d^�k��3��??�3???�?? ?�?�?�m??��??hx??�??�???�??�??�??�??�??�? �?, �? �? �?, �??�??�??�??�??�??�??�??�??�??�??�??�??�??�? �? �? �? �? �?!�?�?#�?$�?%�?�?'�?(�?)�?*�?+�?,�?-�?.�?/�?0�?1�?2�?3�?4�?5�?6�?7�?8�?9�?:�?;�?�?=�?�??�?@�?A�?B�?C�?D�?E�?F�?G�?H�?I�?J�?K�?L�?M�?N�?O�?P�?Q�?R�?S�?T�?U�?V�?W�?X�?Y�?Z�?[�?\�?]�?^�?_�?`�?a�?b�?c�?d�?e�?f�?g�?h�?i�?j�?k�?l�?m�?n�?o�?p�?q�?r�?s�?t�?u�?v�?w�?x�?y�?z�?{�?|�?}�?~�?�?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?... Thanks! Ognen
Re: Problem reading in LZO compressed files
If you’re still seeing gibberish, it’s because Spark is not using the LZO libraries properly. In your case, I believe you should be calling newAPIHadoopFile() instead of textFile(). For example: sc.newAPIHadoopFile(s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data, classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text]) On a side note, here’s a related JIRA issue: SPARK-2394: Make it easier to read LZO-compressed files from EC2 clusters https://issues.apache.org/jira/browse/SPARK-2394 Nick On Sun, Jul 13, 2014 at 10:49 AM, Ognen Duzlevski ognen.duzlev...@gmail.com wrote: Hello, I have been trying to play with the Google ngram dataset provided by Amazon in form of LZO compressed files. I am having trouble understanding what is going on ;). I have added the compression jar and native library to the underlying Hadoop/HDFS installation, restarted the name node and the datanodes, Spark can obviously see the file but I get gibberish on a read. Any ideas? See output below: 14/07/13 14:39:19 INFO SparkContext: Added JAR file:/home/ec2-user/hadoop/ lib/hadoop-gpl-compression-0.1.0.jar at http://10.10.0.100:40100/jars/ hadoop-gpl-compression-0.1.0.jar with timestamp 1405262359777 14/07/13 14:39:20 INFO SparkILoop: Created spark context.. Spark context available as sc. scala val f = sc.textFile(hdfs://10.10.0.98:54310/data/1gram.lzo) 14/07/13 14:39:34 INFO MemoryStore: ensureFreeSpace(163793) called with curMem=0, maxMem=311387750 14/07/13 14:39:34 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 160.0 KB, free 296.8 MB) f: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala f.take(10) 14/07/13 14:39:43 INFO SparkContext: Job finished: take at console:15, took 0.419708348 s res0: Array[String] = Array(SEQ?!org.apache.hadoop. io.LongWritable?org.apache.hadoop.io.Text??#com.hadoop. compression.lzo.LzoCodec���\N�#^�??d^�k���\N�#^�??d^�k��3��??�3???�?? ?�?�?�m??��??hx??�??�???�??�??�??�??�??�? �?, �? �? �?, �??�??�??�??�??�??�??�??�??�??�??�??�??�??�? �? �? �? �? �?!�?�?#�?$�?%�?�?'�?(�?)�?*�?+�?,�?-�?.�?/�?0�?1�?2�?3�? 4�?5�?6�?7�?8�?9�?:�?;�?�?=�?�??�?@�?A�?B�?C�?D�?E�?F�?G�? H�?I�?J�?K�?L�?M�?N�?O�?P�?Q�?R�?S�?T�?U�?V�?W�?X�?Y�?Z�?[�? \�?]�?^�?_�?`�?a�?b�?c�?d�?e�?f�?g�?h�?i�?j�?k�?l�?m�?n�?o�? p�?q�?r�?s�?t�?u�?v�?w�?x�?y�?z�?{�?|�?}�?~�? �?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?... Thanks! Ognen
Re: Confused by groupByKey() and the default partitioner
Thanks, Aaron. I replaced groupByKey with reduceByKey along with some list concatenation operations, and found that the performance becomes even worse. So groupByKey is not that bad in my code. Best regards, - Guanhua From: Aaron Davidson ilike...@gmail.com Reply-To: user@spark.apache.org Date: Sat, 12 Jul 2014 16:32:22 -0700 To: user@spark.apache.org Subject: Re: Confused by groupByKey() and the default partitioner Yes, groupByKey() does partition by the hash of the key unless you specify a custom Partitioner. (1) If you were to use groupByKey() when the data was already partitioned correctly, the data would indeed not be shuffled. Here is the associated code, you'll see that it simply checks that the Partitioner the groupBy() is looking for is equal to the Partitioner of the pre-existing RDD: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/s park/rdd/PairRDDFunctions.scala#L89 By the way, I should warn you that groupByKey() is not a recommended operation if you can avoid it, as it has non-obvious performance issues when running with serious data. On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote: Hi: I have trouble understanding the default partitioner (hash) in Spark. Suppose that an RDD with two partitions is created as follows: x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2) Does spark partition x based on the hash of the key (e.g., a, b, c) by default? (1) Assuming this is correct, if I further use the groupByKey primitive, x.groupByKey(), all the records sharing the same key should be located in the same partition. Then it's not necessary to shuffle the data records around, as all the grouping operations can be done locally. (2) If it's not true, how could I specify a partitioner simply based on the hashing of the key (in Python)? Thank you, - Guanhua
Re: Problem reading in LZO compressed files
Nicholas, Thanks! How do I make spark assemble against a local version of Hadoop? I have 2.4.1 running on a test cluster and I did SPARK_HADOOP_VERSION=2.4.1 sbt/sbt assembly but all it did was pull in hadoop-2.4.1 dependencies via sbt (which is sufficient for using a 2.4.1 HDFS). I am guessing my local version of Hadoop libraries/jars is not used. Alternatively, how do I add the hadoop-gpl-compression-0.1.0.jar (responsible for the lzo stuff) to this hand assembled Spark? I am running the spark-shell like this: bin/spark-shell --jars /home/ec2-user/hadoop/lib/hadoop-gpl-compression-0.1.0.jar and getting this: scala val f = sc.newAPIHadoopFile(hdfs://10.10.0.98:54310/data/1gram.lzo,classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]) 14/07/13 16:53:01 INFO MemoryStore: ensureFreeSpace(216014) called with curMem=0, maxMem=311387750 14/07/13 16:53:01 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 211.0 KB, free 296.8 MB) f: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = NewHadoopRDD[0] at newAPIHadoopFile at console:12 scala f.take(1) 14/07/13 16:53:08 INFO FileInputFormat: Total input paths to process : 1 java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at com.hadoop.mapreduce.LzoTextInputFormat.listStatus(LzoTextInputFormat.java:67) which makes me think something is not linked to something properly (not a Java expert unfortunately). Thanks! Ognen On 7/13/14, 10:35 AM, Nicholas Chammas wrote: If you’re still seeing gibberish, it’s because Spark is not using the LZO libraries properly. In your case, I believe you should be calling |newAPIHadoopFile()| instead of |textFile()|. For example: |sc.newAPIHadoopFile(s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data, classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text]) | On a side note, here’s a related JIRA issue: SPARK-2394: Make it easier to read LZO-compressed files from EC2 clusters https://issues.apache.org/jira/browse/SPARK-2394 Nick On Sun, Jul 13, 2014 at 10:49 AM, Ognen Duzlevski ognen.duzlev...@gmail.com mailto:ognen.duzlev...@gmail.com wrote: Hello, I have been trying to play with the Google ngram dataset provided by Amazon in form of LZO compressed files. I am having trouble understanding what is going on ;). I have added the compression jar and native library to the underlying Hadoop/HDFS installation, restarted the name node and the datanodes, Spark can obviously see the file but I get gibberish on a read. Any ideas? See output below: 14/07/13 14:39:19 INFO SparkContext: Added JAR file:/home/ec2-user/hadoop/lib/hadoop-gpl-compression-0.1.0.jar at http://10.10.0.100:40100/jars/hadoop-gpl-compression-0.1.0.jar with timestamp 1405262359777 14/07/13 14:39:20 INFO SparkILoop: Created spark context.. Spark context available as sc. scala val f = sc.textFile(hdfs://10.10.0.98:54310/data/1gram.lzo http://10.10.0.98:54310/data/1gram.lzo) 14/07/13 14:39:34 INFO MemoryStore: ensureFreeSpace(163793) called with curMem=0, maxMem=311387750 14/07/13 14:39:34 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 160.0 KB, free 296.8 MB) f: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala f.take(10) 14/07/13 14:39:43 INFO SparkContext: Job finished: take at console:15, took 0.419708348 s res0: Array[String] = Array(SEQ?!org.apache.hadoop.io.LongWritable?org.apache.hadoop.io.Text??#com.hadoop.compression.lzo.LzoCodec���\N�#^�??d^�k���\N�#^�??d^�k��3��??�3???�?? ?�?�?�m??��??hx??�??�???�??�??�??�??�??�? �?, �? �? �?, �??�??�??�??�??�??�??�??�??�??�??�??�??�??�? �? �? �? �? �?!�?�?#�?$�?%�?�?'�?(�?)�?*�?+�?,�?-�?.�?/�?0�?1�?2�?3�?4�?5�?6�?7�?8�?9�?:�?;�?�?=�?�??�?@�?A�?B�?C�?D�?E�?F�?G�?H�?I�?J�?K�?L�?M�?N�?O�?P�?Q�?R�?S�?T�?U�?V�?W�?X�?Y�?Z�?[�?\�?]�?^�?_�?`�?a�?b�?c�?d�?e�?f�?g�?h�?i�?j�?k�?l�?m�?n�?o�?p�?q�?r�?s�?t�?u�?v�?w�?x�?y�?z�?{�?|�?}�?~�? �?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?... Thanks! Ognen
Re: Confused by groupByKey() and the default partitioner
Ah -- I should have been more clear, list concatenation isn't going to be any faster. In many cases I've seen people use groupByKey() when they are really trying to do some sort of aggregation. and thus constructing this concatenated list is more expensive than they need. On Sun, Jul 13, 2014 at 9:13 AM, Guanhua Yan gh...@lanl.gov wrote: Thanks, Aaron. I replaced groupByKey with reduceByKey along with some list concatenation operations, and found that the performance becomes even worse. So groupByKey is not that bad in my code. Best regards, - Guanhua From: Aaron Davidson ilike...@gmail.com Reply-To: user@spark.apache.org Date: Sat, 12 Jul 2014 16:32:22 -0700 To: user@spark.apache.org Subject: Re: Confused by groupByKey() and the default partitioner Yes, groupByKey() does partition by the hash of the key unless you specify a custom Partitioner. (1) If you were to use groupByKey() when the data was already partitioned correctly, the data would indeed not be shuffled. Here is the associated code, you'll see that it simply checks that the Partitioner the groupBy() is looking for is equal to the Partitioner of the pre-existing RDD: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L89 By the way, I should warn you that groupByKey() is not a recommended operation if you can avoid it, as it has non-obvious performance issues when running with serious data. On Sat, Jul 12, 2014 at 12:20 PM, Guanhua Yan gh...@lanl.gov wrote: Hi: I have trouble understanding the default partitioner (hash) in Spark. Suppose that an RDD with two partitions is created as follows: x = sc.parallelize([(a, 1), (b, 4), (a, 10), (c, 7)], 2) Does spark partition x based on the hash of the key (e.g., a, b, c) by default? (1) Assuming this is correct, if I further use the groupByKey primitive, x.groupByKey(), all the records sharing the same key should be located in the same partition. Then it's not necessary to shuffle the data records around, as all the grouping operations can be done locally. (2) If it's not true, how could I specify a partitioner simply based on the hashing of the key (in Python)? Thank you, - Guanhua
Re: can't print DStream after reduce
Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: spark ui on yarn
my yarn environment does have less memory for the executors. i am checking if the RDDs are cached by calling sc.getRDDStorageInfo, which shows an RDD as fully cached in memory, yet it does not show up in the UI On Sun, Jul 13, 2014 at 1:49 AM, Matei Zaharia matei.zaha...@gmail.com wrote: The UI code is the same in both, but one possibility is that your executors were given less memory on YARN. Can you check that? Or otherwise, how do you know that some RDDs were cached? Matei On Jul 12, 2014, at 4:12 PM, Koert Kuipers ko...@tresata.com wrote: hey shuo, so far all stage links work fine for me. i did some more testing, and it seems kind of random what shows up on the gui and what does not. some partially cached RDDs make it to the GUI, while some fully cached ones do not. I have not been able to detect a pattern. is the codebase for the gui different in standalone than in yarn-client mode? On Sat, Jul 12, 2014 at 3:34 AM, Shuo Xiang shuoxiang...@gmail.com wrote: Hi Koert, Just curious did you find any information like CANNOT FIND ADDRESS after clicking into some stage? I've seen similar problems due to lost of executors. Best, On Fri, Jul 11, 2014 at 4:42 PM, Koert Kuipers ko...@tresata.com wrote: I just tested a long lived application (that we normally run in standalone mode) on yarn in client mode. it looks to me like cached rdds are missing in the storage tap of the ui. accessing the rdd storage information via the spark context shows rdds as fully cached but they are missing on storage page. spark 1.0.0
Re: can't print DStream after reduce
Update on this: val lines = ssc.socketTextStream(localhost, ) lines.print // works lines.map(_-1).print // works lines.map(_-1).reduceByKey(_+_).print // nothing printed to driver console Just lots of: 14/07/13 11:37:40 INFO receiver.BlockGenerator: Pushed block input-0-1405276660400 14/07/13 11:37:41 INFO scheduler.ReceiverTracker: Stream 0 received 1 blocks 14/07/13 11:37:41 INFO scheduler.JobScheduler: Added jobs for time 1405276661000 ms 14/07/13 11:37:41 INFO storage.MemoryStore: ensureFreeSpace(60) called with curMem=1275, maxMem=98539929 14/07/13 11:37:41 INFO storage.MemoryStore: Block input-0-1405276661400 stored as bytes to memory (size 60.0 B, free 94.0 MB) 14/07/13 11:37:41 INFO storage.BlockManagerInfo: Added input-0-1405276661400 in memory on 25.17.218.118:55820 (size: 60.0 B, free: 94.0 MB) 14/07/13 11:37:41 INFO storage.BlockManagerMaster: Updated info of block input-0-1405276661400 Any insight? Thanks On Sun, Jul 13, 2014 at 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: can't print DStream after reduce
Thanks for your interest. lines.foreachRDD(x = println(x.count)) And I got 0 every once in a while (which I think is strange, because lines.print prints the input I'm giving it over the socket.) When I tried: lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count)) I got no count. Thanks On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
SparkSql newbie problems with nested selects
Hi I am running into trouble with a nested query using python. To try and debug it, I first wrote the query I want using sqlite3 select freq.docid, freqTranspose.docid, sum(freq.count * freqTranspose.count) from Frequency as freq, (select term, docid, count from Frequency) as freqTranspose where freq.term = freqTranspose.term group by freq.docid, freqTranspose.docid ; Sparksql has trouble parsing the (select ) as freqTranspose ³ line Here is what my input data looks like $ head -n 3 reuters.db.csv docid,term,count 1_txt_earn,net,1 1_txt_earn,rogers,4 The output from sqlite3 is $ head -n 6 3hSimilarityMatrix.slow.sql.out freq.docid freqTranspose.docid sum(freq.count * freqTranspose.count) -- --- - 1_txt_earn 1_txt_earn 127 1_txt_earn 10054_txt_earn 33 1_txt_earn 10080_txt_crude 146 1_txt_earn 10088_txt_acq11 $ My code example pretty much follows http://spark.apache.org/docs/latest/sql-programming-guide.html dataFile = sc.textFile(reuters.db.csv²) lines = dataFile.map(lambda l: l.split(,²)) def mapLines(line) : ret = {} ret['docid'] = line[0] ret['term'] = line[1] ret['count'] = line[2] return ret frequency = lines.map(mapLines) schemaFrequency = sqlContext.inferSchema(frequency) schemaFrequency.registerAsTable(frequency²) Okay here is where I run into trouble sqlCmd = select \ freq.docid, \ freqTranspose.docid \ from \ frequency as freq, \ (select term, docid, count from frequency) \ similarities = sqlContext.sql(sqlCmd) /Users/andy/workSpace/dataBricksIntroToApacheSpark/USBStick/spark/python/lib /py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o40.sql. : java.lang.RuntimeException: [1.153] failure: ``('' expected but `from' found select freq.docid, freqTranspose.docid from frequency as freq, (select term, docid, count from frequency) Simple sql seems to ³parse² I.e. Select freq.docid from frequency as freq Any suggestions would be greatly appreciated. Andy P.s. I should note, I think I am using version 1.0 ?
Re: SparkSql newbie problems with nested selects
Hi Andy, The SQL parser is pretty basic (we plan to improve this for the 1.2 release). In this case I think part of the problem is that one of your variables is count, which is a reserved word. Unfortunately, we don't have the ability to escape identifiers at this point. However, I did manage to get your query to parse using the HiveQL parser, provided by HiveContext. hiveCtx.hql( select freq.docid, freqTranspose.docid, sum(freq.count * freqTranspose.count) from Frequency freq JOIN (select term, docid, count from Frequency) freqTranspose where freq.term = freqTranspose.term group by freq.docid, freqTranspose.docid) Michael On Sun, Jul 13, 2014 at 12:43 PM, Andy Davidson a...@santacruzintegration.com wrote: Hi I am running into trouble with a nested query using python. To try and debug it, I first wrote the query I want using sqlite3 select freq.docid, freqTranspose.docid, sum(freq.count * freqTranspose.count) from Frequency as freq, (select term, docid, count from Frequency) as freqTranspose where freq.term = freqTranspose.term group by freq.docid, freqTranspose.docid ; Sparksql has trouble parsing the (select ) as freqTranspose “ line Here is what my input data looks like $ head -n 3 reuters.db.csv docid,term,count 1_txt_earn,net,1 1_txt_earn,rogers,4 The output from sqlite3 is $ head -n 6 3hSimilarityMatrix.slow.sql.out freq.docid freqTranspose.docid sum(freq.count * freqTranspose.count) -- --- - 1_txt_earn 1_txt_earn 127 1_txt_earn 10054_txt_earn 33 1_txt_earn 10080_txt_crude 146 1_txt_earn 10088_txt_acq11 $ My code example pretty much follows http://spark.apache.org/docs/latest/sql-programming-guide.html dataFile = sc.textFile(reuters.db.csv”) lines = dataFile.map(lambda l: l.split(,”)) def mapLines(line) : ret = {} ret['docid'] = line[0] ret['term'] = line[1] ret['count'] = line[2] return ret frequency = lines.map(mapLines) schemaFrequency = sqlContext.inferSchema(frequency) schemaFrequency.registerAsTable(frequency”) Okay here is where I run into trouble sqlCmd = select \ freq.docid, \ freqTranspose.docid \ from \ frequency as freq, \ (select term, docid, count from frequency) \ similarities = sqlContext.sql(sqlCmd) /Users/andy/workSpace/dataBricksIntroToApacheSpark/USBStick/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)298 raise Py4JJavaError(299 'An error occurred while calling {0}{1}{2}.\n'.-- 300 format(target_id, '.', name), value)301 else:302 raise Py4JError( Py4JJavaError: An error occurred while calling o40.sql. : java.lang.RuntimeException: [1.153] failure: ``('' expected but `from' found select freq.docid, freqTranspose.docid from frequency as freq, (select term, docid, count from frequency) Simple sql seems to “parse” I.e. Select freq.docid from frequency as freq Any suggestions would be greatly appreciated. Andy P.s. I should note, I think I am using version 1.0 ?
Task serialized size dependent on size of RDD?
Hi, I'm having trouble serializing tasks for this code: val rddC = (rddA join rddB) .map { case (x, (y, z)) = z - y } .reduceByKey( { (y1, y2) = Semigroup.plus(y1, y2) }, 1000) Somehow when running on a small data set the size of the serialized task is about 650KB, which is very big, and when running on a big data set it's about 1.3MB, which is huge. I can't find what's causing it. The only reference to an object outside the scope of the closure is to a static method on Semigroup, which should serialize fine. The proof that it's okay to call Semigroup like that is that I have another operation in the same job that uses it and that's serializes okay (~30KB). The part I really don't get is why is the size of the serialized task dependent on the size of the RDD? I haven't used parallelize to create rddA and rddB, but rather derived them from transformations of hive tables. Thanks, - Sebastien
Re: can't print DStream after reduce
More strange behavior: lines.foreachRDD(x = println(x.first)) // works lines.foreachRDD(x = println((x.count,x.first))) // no output is printed to driver console On Sun, Jul 13, 2014 at 11:47 AM, Walrus theCat walrusthe...@gmail.com wrote: Thanks for your interest. lines.foreachRDD(x = println(x.count)) And I got 0 every once in a while (which I think is strange, because lines.print prints the input I'm giving it over the socket.) When I tried: lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count)) I got no count. Thanks On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: SparkSql newbie problems with nested selects
Hi Michael Changing my col name to something other the count¹ . Fixed the parse error Many thanks, Andy From: Michael Armbrust mich...@databricks.com Reply-To: user@spark.apache.org Date: Sunday, July 13, 2014 at 1:18 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org u...@spark.incubator.apache.org Subject: Re: SparkSql newbie problems with nested selects Hi Andy, The SQL parser is pretty basic (we plan to improve this for the 1.2 release). In this case I think part of the problem is that one of your variables is count, which is a reserved word. Unfortunately, we don't have the ability to escape identifiers at this point. However, I did manage to get your query to parse using the HiveQL parser, provided by HiveContext. hiveCtx.hql( select freq.docid, freqTranspose.docid, sum(freq.count * freqTranspose.count) from Frequency freq JOIN (select term, docid, count from Frequency) freqTranspose where freq.term = freqTranspose.term group by freq.docid, freqTranspose.docid) Michael On Sun, Jul 13, 2014 at 12:43 PM, Andy Davidson a...@santacruzintegration.com wrote: Hi I am running into trouble with a nested query using python. To try and debug it, I first wrote the query I want using sqlite3 select freq.docid, freqTranspose.docid, sum(freq.count * freqTranspose.count) from Frequency as freq, (select term, docid, count from Frequency) as freqTranspose where freq.term = freqTranspose.term group by freq.docid, freqTranspose.docid ; Sparksql has trouble parsing the (select ) as freqTranspose ³ line Here is what my input data looks like $ head -n 3 reuters.db.csv docid,term,count 1_txt_earn,net,1 1_txt_earn,rogers,4 The output from sqlite3 is $ head -n 6 3hSimilarityMatrix.slow.sql.out freq.docid freqTranspose.docid sum(freq.count * freqTranspose.count) -- --- - 1_txt_earn 1_txt_earn 127 1_txt_earn 10054_txt_earn 33 1_txt_earn 10080_txt_crude 146 1_txt_earn 10088_txt_acq11 $ My code example pretty much follows http://spark.apache.org/docs/latest/sql-programming-guide.html dataFile = sc.textFile(reuters.db.csv²) lines = dataFile.map(lambda l: l.split(,²)) def mapLines(line) : ret = {} ret['docid'] = line[0] ret['term'] = line[1] ret['count'] = line[2] return ret frequency = lines.map(mapLines) schemaFrequency = sqlContext.inferSchema(frequency) schemaFrequency.registerAsTable(frequency²) Okay here is where I run into trouble sqlCmd = select \ freq.docid, \ freqTranspose.docid \ from \ frequency as freq, \ (select term, docid, count from frequency) \ similarities = sqlContext.sql(sqlCmd) /Users/andy/workSpace/dataBricksIntroToApacheSpark/USBStick/spark/python/lib/ py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'.-- 300 format(target_id, '.', name), value) 301 else:302 raise Py4JError( Py4JJavaError: An error occurred while calling o40.sql. : java.lang.RuntimeException: [1.153] failure: ``('' expected but `from' found select freq.docid, freqTranspose.docid from frequency as freq, (select term, docid, count from frequency) Simple sql seems to ³parse² I.e. Select freq.docid from frequency as freq Any suggestions would be greatly appreciated. Andy P.s. I should note, I think I am using version 1.0 ?
Re: can't print DStream after reduce
Great success! I was able to get output to the driver console by changing the construction of the Streaming Spark Context from: val ssc = new StreamingContext(local /**TODO change once a cluster is up **/, AppName, Seconds(1)) to: val ssc = new StreamingContext(local[2] /**TODO change once a cluster is up **/, AppName, Seconds(1)) I found something that tipped me off that this might work by digging through this mailing list. On Sun, Jul 13, 2014 at 2:00 PM, Walrus theCat walrusthe...@gmail.com wrote: More strange behavior: lines.foreachRDD(x = println(x.first)) // works lines.foreachRDD(x = println((x.count,x.first))) // no output is printed to driver console On Sun, Jul 13, 2014 at 11:47 AM, Walrus theCat walrusthe...@gmail.com wrote: Thanks for your interest. lines.foreachRDD(x = println(x.count)) And I got 0 every once in a while (which I think is strange, because lines.print prints the input I'm giving it over the socket.) When I tried: lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count)) I got no count. Thanks On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Ideal core count within a single JVM
Hello, What would be an ideal core count to run a spark job in local mode to get best utilization of CPU? Actually I have a 48-core machine but the performance of local[48] is poor as compared to local[10]. Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Ideal-core-count-within-a-single-JVM-tp9566.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: can't print DStream after reduce
This almost had me not using Spark; I couldn't get any output. It is not at all obvious what's going on here to the layman (and to the best of my knowledge, not documented anywhere), but now you know you'll be able to answer this question for the numerous people that will also have it. On Sun, Jul 13, 2014 at 5:24 PM, Walrus theCat walrusthe...@gmail.com wrote: Great success! I was able to get output to the driver console by changing the construction of the Streaming Spark Context from: val ssc = new StreamingContext(local /**TODO change once a cluster is up **/, AppName, Seconds(1)) to: val ssc = new StreamingContext(local[2] /**TODO change once a cluster is up **/, AppName, Seconds(1)) I found something that tipped me off that this might work by digging through this mailing list. On Sun, Jul 13, 2014 at 2:00 PM, Walrus theCat walrusthe...@gmail.com wrote: More strange behavior: lines.foreachRDD(x = println(x.first)) // works lines.foreachRDD(x = println((x.count,x.first))) // no output is printed to driver console On Sun, Jul 13, 2014 at 11:47 AM, Walrus theCat walrusthe...@gmail.com wrote: Thanks for your interest. lines.foreachRDD(x = println(x.count)) And I got 0 every once in a while (which I think is strange, because lines.print prints the input I'm giving it over the socket.) When I tried: lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count)) I got no count. Thanks On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: not getting output from socket connection
Make sure you use local[n] (where n 1) in your context setup too, (if you're running locally), or you won't get output. On Sat, Jul 12, 2014 at 11:36 PM, Walrus theCat walrusthe...@gmail.com wrote: Thanks! I thought it would get passed through netcat, but given your email, I was able to follow this tutorial and get it to work: http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html On Fri, Jul 11, 2014 at 1:31 PM, Sean Owen so...@cloudera.com wrote: netcat is listening for a connection on port . It is echoing what you type to its console to anything that connects to and reads. That is what Spark streaming does. If you yourself connect to and write, nothing happens except that netcat echoes it. This does not cause Spark to somehow get that data. nc is only echoing input from the console. On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a java application that is outputting a string every second. I'm running the wordcount example that comes with Spark 1.0, and running nc -lk . When I type words into the terminal running netcat, I get counts. However, when I write the String onto a socket on port , I don't get counts. I can see the strings showing up in the netcat terminal, but no counts from Spark. If I paste in the string, I get counts. Any ideas? Thanks
Re: can't print DStream after reduce
How about a PR that rejects a context configured for local or local[1]? As I understand it is not intended to work and has bitten several people. On Jul 14, 2014 12:24 AM, Michael Campbell michael.campb...@gmail.com wrote: This almost had me not using Spark; I couldn't get any output. It is not at all obvious what's going on here to the layman (and to the best of my knowledge, not documented anywhere), but now you know you'll be able to answer this question for the numerous people that will also have it. On Sun, Jul 13, 2014 at 5:24 PM, Walrus theCat walrusthe...@gmail.com wrote: Great success! I was able to get output to the driver console by changing the construction of the Streaming Spark Context from: val ssc = new StreamingContext(local /**TODO change once a cluster is up **/, AppName, Seconds(1)) to: val ssc = new StreamingContext(local[2] /**TODO change once a cluster is up **/, AppName, Seconds(1)) I found something that tipped me off that this might work by digging through this mailing list. On Sun, Jul 13, 2014 at 2:00 PM, Walrus theCat walrusthe...@gmail.com wrote: More strange behavior: lines.foreachRDD(x = println(x.first)) // works lines.foreachRDD(x = println((x.count,x.first))) // no output is printed to driver console On Sun, Jul 13, 2014 at 11:47 AM, Walrus theCat walrusthe...@gmail.com wrote: Thanks for your interest. lines.foreachRDD(x = println(x.count)) And I got 0 every once in a while (which I think is strange, because lines.print prints the input I'm giving it over the socket.) When I tried: lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count)) I got no count. Thanks On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: Problem reading in LZO compressed files
I actually never got this to work, which is part of the reason why I filed that JIRA. Apart from using --jar when starting the shell, I don’t have any more pointers for you. :( On Sun, Jul 13, 2014 at 12:57 PM, Ognen Duzlevski ognen.duzlev...@gmail.com wrote: Nicholas, Thanks! How do I make spark assemble against a local version of Hadoop? I have 2.4.1 running on a test cluster and I did SPARK_HADOOP_VERSION=2.4.1 sbt/sbt assembly but all it did was pull in hadoop-2.4.1 dependencies via sbt (which is sufficient for using a 2.4.1 HDFS). I am guessing my local version of Hadoop libraries/jars is not used. Alternatively, how do I add the hadoop-gpl-compression-0.1.0.jar (responsible for the lzo stuff) to this hand assembled Spark? I am running the spark-shell like this: bin/spark-shell --jars /home/ec2-user/hadoop/lib/hadoop-gpl-compression-0.1.0.jar and getting this: scala val f = sc.newAPIHadoopFile(hdfs://10.10.0.98:54310/data/1gram.lzo ,classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]) 14/07/13 16:53:01 INFO MemoryStore: ensureFreeSpace(216014) called with curMem=0, maxMem=311387750 14/07/13 16:53:01 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 211.0 KB, free 296.8 MB) f: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = NewHadoopRDD[0] at newAPIHadoopFile at console:12 scala f.take(1) 14/07/13 16:53:08 INFO FileInputFormat: Total input paths to process : 1 java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at com.hadoop.mapreduce.LzoTextInputFormat.listStatus(LzoTextInputFormat.java:67) which makes me think something is not linked to something properly (not a Java expert unfortunately). Thanks! Ognen On 7/13/14, 10:35 AM, Nicholas Chammas wrote: If you’re still seeing gibberish, it’s because Spark is not using the LZO libraries properly. In your case, I believe you should be calling newAPIHadoopFile() instead of textFile(). For example: sc.newAPIHadoopFile(s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data, classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text]) On a side note, here’s a related JIRA issue: SPARK-2394: Make it easier to read LZO-compressed files from EC2 clusters https://issues.apache.org/jira/browse/SPARK-2394 Nick On Sun, Jul 13, 2014 at 10:49 AM, Ognen Duzlevski ognen.duzlev...@gmail.com wrote: Hello, I have been trying to play with the Google ngram dataset provided by Amazon in form of LZO compressed files. I am having trouble understanding what is going on ;). I have added the compression jar and native library to the underlying Hadoop/HDFS installation, restarted the name node and the datanodes, Spark can obviously see the file but I get gibberish on a read. Any ideas? See output below: 14/07/13 14:39:19 INFO SparkContext: Added JAR file:/home/ec2-user/hadoop/lib/hadoop-gpl-compression-0.1.0.jar at http://10.10.0.100:40100/jars/hadoop-gpl-compression-0.1.0.jar with timestamp 1405262359777 14/07/13 14:39:20 INFO SparkILoop: Created spark context.. Spark context available as sc. scala val f = sc.textFile(hdfs://10.10.0.98:54310/data/1gram.lzo) 14/07/13 14:39:34 INFO MemoryStore: ensureFreeSpace(163793) called with curMem=0, maxMem=311387750 14/07/13 14:39:34 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 160.0 KB, free 296.8 MB) f: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala f.take(10) 14/07/13 14:39:43 INFO SparkContext: Job finished: take at console:15, took 0.419708348 s res0: Array[String] = Array(SEQ?!org.apache.hadoop.io.LongWritable?org.apache.hadoop.io.Text??#com.hadoop.compression.lzo.LzoCodec���\N�#^�??d^�k���\N�#^�??d^�k��3��??�3???�?? ?�?�?�m??��??hx??�??�???�??�??�??�??�??�? �?, �? �? �?, �??�??�??�??�??�??�??�??�??�??�??�??�??�??�? �? �? �? �? �?!�?�?#�?$�?%�?�?'�?(�?)�?*�?+�?,�?-�?.�?/�?0�?1�?2�?3�?4�?5�?6�?7�?8�?9�?:�?;�?�?=�?�??�?@�?A�?B�?C�?D�?E�?F�?G�?H�?I�?J�?K�?L�?M�?N�?O�?P�?Q�?R�?S�?T�?U�?V�?W�?X�?Y�?Z�?[�?\�?]�?^�?_�?`�?a�?b�?c�?d�?e�?f�?g�?h�?i�?j�?k�?l�?m�?n�?o�?p�?q�?r�?s�?t�?u�?v�?w�?x�?y�?z�?{�?|�?}�?~�? �?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?... Thanks! Ognen
Re: Spark Streaming with Kafka NoClassDefFoundError
In case you still have issues with duplicate files in uber jar, here is a reference sbt file with assembly plugin that deals with duplicates https://github.com/databricks/training/blob/sparkSummit2014/streaming/scala/build.sbt On Fri, Jul 11, 2014 at 10:06 AM, Bill Jay bill.jaypeter...@gmail.com wrote: You may try to use this one: https://github.com/sbt/sbt-assembly I had an issue of duplicate files in the uber jar file. But I think this library will assemble dependencies into a single jar file. Bill On Fri, Jul 11, 2014 at 1:34 AM, Dilip dilip_ram...@hotmail.com wrote: A simple sbt assembly is not working. Is there any other way to include particular jars with assembly command? Regards, Dilip On Friday 11 July 2014 12:45 PM, Bill Jay wrote: I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Spark Streaming with Kafka NoClassDefFoundError
Alsom the reason the spark-streaming-kafka is not included in the spark assembly is that we do not want dependencies of external systems like kafka (which itself probably has a complex dependency tree) to cause conflict with the core spark's functionality and stability. TD On Sun, Jul 13, 2014 at 5:48 PM, Tathagata Das tathagata.das1...@gmail.com wrote: In case you still have issues with duplicate files in uber jar, here is a reference sbt file with assembly plugin that deals with duplicates https://github.com/databricks/training/blob/sparkSummit2014/streaming/scala/build.sbt On Fri, Jul 11, 2014 at 10:06 AM, Bill Jay bill.jaypeter...@gmail.com wrote: You may try to use this one: https://github.com/sbt/sbt-assembly I had an issue of duplicate files in the uber jar file. But I think this library will assemble dependencies into a single jar file. Bill On Fri, Jul 11, 2014 at 1:34 AM, Dilip dilip_ram...@hotmail.com wrote: A simple sbt assembly is not working. Is there any other way to include particular jars with assembly command? Regards, Dilip On Friday 11 July 2014 12:45 PM, Bill Jay wrote: I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Possible bug in ClientBase.scala?
Hi, I was doing programmatic submission of Spark yarn jobs and I saw code in ClientBase.getDefaultYarnApplicationClasspath(): val field = classOf[MRJobConfig].getField(DEFAULT_YARN_APPLICATION_CLASSPATH) MRJobConfig doesn't have this field so the created launch env is incomplete. Workaround is to set yarn.application.classpath with the value from YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH. This results in having the spark job hang if the submission config is different from the default config. For example, if my resource manager port is 8050 instead of 8030, then the spark app is not able to register itself and stays in ACCEPTED state. I can easily fix this by changing this to YarnConfiguration instead of MRJobConfig but was wondering what the steps are for submitting a fix. Thanks, Ron Sent from my iPhone
Re: Possible bug in ClientBase.scala?
On Sun, Jul 13, 2014 at 9:49 PM, Ron Gonzalez zlgonza...@yahoo.com wrote: I can easily fix this by changing this to YarnConfiguration instead of MRJobConfig but was wondering what the steps are for submitting a fix. Relevant links: - https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark - https://github.com/apache/spark/pulls Nick
Re: not getting output from socket connection
Hah, thanks for tidying up the paper trail here, but I was the OP (and solver) of the recent reduce thread that ended in this solution. On Sun, Jul 13, 2014 at 4:26 PM, Michael Campbell michael.campb...@gmail.com wrote: Make sure you use local[n] (where n 1) in your context setup too, (if you're running locally), or you won't get output. On Sat, Jul 12, 2014 at 11:36 PM, Walrus theCat walrusthe...@gmail.com wrote: Thanks! I thought it would get passed through netcat, but given your email, I was able to follow this tutorial and get it to work: http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html On Fri, Jul 11, 2014 at 1:31 PM, Sean Owen so...@cloudera.com wrote: netcat is listening for a connection on port . It is echoing what you type to its console to anything that connects to and reads. That is what Spark streaming does. If you yourself connect to and write, nothing happens except that netcat echoes it. This does not cause Spark to somehow get that data. nc is only echoing input from the console. On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a java application that is outputting a string every second. I'm running the wordcount example that comes with Spark 1.0, and running nc -lk . When I type words into the terminal running netcat, I get counts. However, when I write the String onto a socket on port , I don't get counts. I can see the strings showing up in the netcat terminal, but no counts from Spark. If I paste in the string, I get counts. Any ideas? Thanks
Re: Streaming. Cannot get socketTextStream to receive anything.
Hi Akhil Das Thanks. I tried the codes. and it works. There's a problem with my socket codes that is not flushing the content out, and for the test tool, Hercules, I have to close the socket connection to flush the content out. I am going to troubleshoot why nc works, and the codes and test tool don't. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9576.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Possible bug in ClientBase.scala?
Ron, Which distribution and Version of Hadoop are you using ? I just looked at CDH5 ( hadoop-mapreduce-client-core- 2.3.0-cdh5.0.0), MRJobConfig does have the field : java.lang.String DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH; Chester On Sun, Jul 13, 2014 at 6:49 PM, Ron Gonzalez zlgonza...@yahoo.com wrote: Hi, I was doing programmatic submission of Spark yarn jobs and I saw code in ClientBase.getDefaultYarnApplicationClasspath(): val field = classOf[MRJobConfig].getField(DEFAULT_YARN_APPLICATION_CLASSPATH) MRJobConfig doesn't have this field so the created launch env is incomplete. Workaround is to set yarn.application.classpath with the value from YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH. This results in having the spark job hang if the submission config is different from the default config. For example, if my resource manager port is 8050 instead of 8030, then the spark app is not able to register itself and stays in ACCEPTED state. I can easily fix this by changing this to YarnConfiguration instead of MRJobConfig but was wondering what the steps are for submitting a fix. Thanks, Ron Sent from my iPhone
Re: Supported SQL syntax in Spark SQL
For example, are LIKE 'string%' queries supported? Trying one on 1.0.1 yields java.lang.ExceptionInInitializerError. Nick On Sat, Jul 12, 2014 at 10:16 PM, Nick Chammas nicholas.cham...@gmail.com wrote: Is there a place where we can find an up-to-date list of supported SQL syntax in Spark SQL? Nick -- View this message in context: Supported SQL syntax in Spark SQL http://apache-spark-user-list.1001560.n3.nabble.com/Supported-SQL-syntax-in-Spark-SQL-tp9538.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Anaconda Spark AMI
Hi Ben, This is great! I just spun up an EC2 cluster and tested basic pyspark + ipython/numpy/scipy functionality, and all seems to be working so far. Will let you know if any issues arise. We do a lot with pyspark + scientific computing, and for EC2 usage I think this is a terrific way to get the core libraries installed. -- Jeremy On Jul 12, 2014, at 4:25 PM, Benjamin Zaitlen quasi...@gmail.com wrote: Hi All, Thanks to Jey's help, I have a release AMI candidate for spark-1.0/anaconda-2.0 integration. It's currently limited to availability in US-EAST: ami-3ecd0c56 Give it a try if you have some time. This should just work with spark 1.0: ./spark-ec2 -k my_key -i ~/.ssh/mykey.rsa -a ami-3ecd0c56 If you have suggestions or run into trouble please email, --Ben PS: I found that writing a noop map function is a decent way to install pkgs on worker nodes (though most scientific pkgs are pre-installed with anaconda: def subprocess_noop(x): import os os.system(/opt/anaconda/bin/conda install h5py) return 1 install_noop = rdd.map(subprocess_noop) install_noop.count() On Thu, Jul 3, 2014 at 2:32 PM, Jey Kottalam j...@cs.berkeley.edu wrote: Hi Ben, Has the PYSPARK_PYTHON environment variable been set in spark/conf/spark-env.sh to the path of the new python binary? FYI, there's a /root/copy-dirs script that can be handy when updating files on an already-running cluster. You'll want to restart the spark cluster for the changes to take effect, as described at https://spark.apache.org/docs/latest/ec2-scripts.html Hope that helps, -Jey On Thu, Jul 3, 2014 at 11:54 AM, Benjamin Zaitlen quasi...@gmail.com wrote: Hi All, I'm a dev a Continuum and we are developing a fair amount of tooling around Spark. A few days ago someone expressed interest in numpy+pyspark and Anaconda came up as a reasonable solution. I spent a number of hours yesterday trying to rework the base Spark AMI on EC2 but sadly was defeated by a number of errors. Aggregations seemed to choke -- where as small takes executed as aspected (errors are linked to the gist): sc.appName u'PySparkShell' sc._conf.getAll() [(u'spark.executor.extraLibraryPath', u'/root/ephemeral-hdfs/lib/native/'), (u'spark.executor.memory', u'6154m'), (u'spark.submit.pyFiles', u''), (u'spark.app.name', u' PySparkShell'), (u'spark.executor.extraClassPath', u'/root/ephemeral-hdfs/conf'), (u'spark.master', u'spark://.compute-1.amazonaws.com:7077')] file = sc.textFile(hdfs:///user/root/chekhov.txt) file.take(2) [uProject Gutenberg's Plays by Chekhov, Second Series, by Anton Chekhov, u''] lines = file.filter(lambda x: len(x) 0) lines.count() VARIOUS ERROS DISCUSSED BELOW My first thought was that I could simply get away with including anaconda on the base AMI, point the path at /dir/anaconda/bin, and bake a new one. Doing so resulted in some strange py4j errors like the following: Py4JError: An error occurred while calling o17.partitions. Trace: py4j.Py4JException: Method partitions([]) does not exist At some point I also saw: SystemError: Objects/cellobject.c:24: bad argument to internal function which is really strange, possibly the result of a version mismatch? I had another thought of building spark from master on the AMI, leaving the spark directory in place, and removing the spark call from the modules list in spark-ec2 launch script. Unfortunately, this resulted in the following errors: https://gist.github.com/quasiben/da0f4778fbc87d02c088 If a spark dev was willing to make some time in the near future, I'm sure she/he and I could sort out these issues and give the Spark community a python distro ready to go for numerical computing. For instance, I'm not sure how pyspark calls out to launching a python session on a slave? Is this done as root or as the hadoop user? (i believe i changed /etc/bashrc to point to my anaconda bin directory so it shouldn't really matter. Is there something special about the py4j zip include in spark dir compared with the py4j in pypi? Thoughts? --Ben
Re: Supported SQL syntax in Spark SQL
Actually, this looks like its some kind of regression in 1.0.1, perhaps related to assembly and packaging with spark-ec2. I don’t see this issue with the same data on a 1.0.0 EC2 cluster. How can I trace this down for a bug report? Nick On Sun, Jul 13, 2014 at 11:18 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: For example, are LIKE 'string%' queries supported? Trying one on 1.0.1 yields java.lang.ExceptionInInitializerError. Nick On Sat, Jul 12, 2014 at 10:16 PM, Nick Chammas nicholas.cham...@gmail.com wrote: Is there a place where we can find an up-to-date list of supported SQL syntax in Spark SQL? Nick -- View this message in context: Supported SQL syntax in Spark SQL http://apache-spark-user-list.1001560.n3.nabble.com/Supported-SQL-syntax-in-Spark-SQL-tp9538.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Catalyst dependency on Spark Core
As per the recent presentation given in Scala days ( http://people.apache.org/~marmbrus/talks/SparkSQLScalaDays2014.pdf), it was mentioned that Catalyst is independent of Spark. But on inspecting pom.xml of sql/catalyst module, it seems it has a dependency on Spark Core. Any particular reason for the dependency? I would love to use Catalyst outside Spark (reposted as previous email bounced. Sorry if this is a duplicate).
Re: Supported SQL syntax in Spark SQL
Are you sure the code running on the cluster has been updated? We recently optimized the execution of LIKE queries that can be evaluated without using full regular expressions. So it's possible this error is due to missing functionality on the executors. How can I trace this down for a bug report? If the above doesn't fix it, the following would be helpful: - The full stack trace - The queryExecution from the SchemaRDD (i.e. println(sql(SELECT ...).queryExecution))
Re: Streaming. Cannot get socketTextStream to receive anything.
Hi, I experienced exactly the same problems when using SparkContext with local[1] master specification, because in that case one thread is used for receiving data, the others for processing. As there is only one thread running, no processing will take place. Once you shut down the connection, the receiver thread will be used for processing. Any chance you run into the same issue? Tobias On Mon, Jul 14, 2014 at 11:45 AM, kytay kaiyang@gmail.com wrote: Hi Akhil Das Thanks. I tried the codes. and it works. There's a problem with my socket codes that is not flushing the content out, and for the test tool, Hercules, I have to close the socket connection to flush the content out. I am going to troubleshoot why nc works, and the codes and test tool don't. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9576.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
SPARK S3 LZO input; worker stuck
Hi I'm trying to read lzo compressed files from S3 using spark. The lzo files are not indexed. Spark job starts to read the files just fine but after a while it just hangs. No network throughput. I have to restart the worker process to get it back up. Any idea what could be causing this. We were using uncompressed files before and that worked just fine, went with the compression to reduce S3 storage. Any help would be appreciated. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-S3-LZO-input-worker-stuck-tp9584.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Supported SQL syntax in Spark SQL
Are you sure the code running on the cluster has been updated? I launched the cluster using spark-ec2 from the 1.0.1 release, so I’m assuming that’s taken care of, at least in theory. I just spun down the clusters I had up, but I will revisit this tomorrow and provide the information you requested. Nick
Re: Powered By Spark: Can you please add our org?
Awesome! Thanks, Reynold! On Tue, Jul 8, 2014 at 4:00 PM, Reynold Xin r...@databricks.com wrote: I added you to the list. Cheers. On Mon, Jul 7, 2014 at 6:19 PM, Alex Gaudio adgau...@gmail.com wrote: Hi, Sailthru is also using Spark. Could you please add us to the Powered By Spark https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark page when you have a chance? Organization Name: Sailthru URL: www.sailthru.com Short Description: Our data science platform uses Spark to build predictive models and recommendation systems for marketing automation and personalization Thank you! Alex
Re: Streaming. Cannot get socketTextStream to receive anything.
Hi Tobias I have been using local[4] to test. My problem is likely caused by the tcp host server that I am trying the emulate. I was trying to emulate the tcp host to send out messages. (although I am not sure at the moment :D) First way I tried was to use a tcp tool called, Hercules. Second way was to write a simple socket code to send message at interval. Like the one shown in #2 of my first post. I suspect the reason why it don't work is due the messages are not flush so no message was received on Spark Streaming. I think I will need to do more testing to understand the behavior. I am currently not sure why nc -lk is working, and not the other tools or codes I am testing with. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9588.html Sent from the Apache Spark User List mailing list archive at Nabble.com.