RE: Spark SQL question: why build hashtable for both sides in HashOuterJoin?
Liquan, yes, for full outer join, one hash table on both sides is more efficient. For the left/right outer join, it looks like one hash table should be enought. From: Liquan Pei [mailto:liquan...@gmail.com] Sent: 2014年9月30日 18:34 To: Haopu Wang Cc: d...@spark.apache.org; user Subject: Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, How about full outer join? One hash table may not be efficient for this case. Liquan On Mon, Sep 29, 2014 at 11:47 PM, Haopu Wang hw...@qilinsoft.com wrote: Hi, Liquan, thanks for the response. In your example, I think the hash table should be built on the right side, so Spark can iterate through the left side and find matches in the right side from the hash table efficiently. Please comment and suggest, thanks again! From: Liquan Pei [mailto:liquan...@gmail.com] Sent: 2014年9月30日 12:31 To: Haopu Wang Cc: d...@spark.apache.org; user Subject: Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, My understanding is that the hashtable on both left and right side is used for including null values in result in an efficient manner. If hash table is only built on one side, let's say left side and we perform a left outer join, for each row in left side, a scan over the right side is needed to make sure that no matching tuples for that row on left side. Hope this helps! Liquan On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang hw...@qilinsoft.com wrote: I take a look at HashOuterJoin and it's building a Hashtable for both sides. This consumes quite a lot of memory when the partition is big. And it doesn't reduce the iteration on streamed relation, right? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: window every n elements instead of time based
Hi Michael, I think you are meaning batch interval instead of windowing. It can be helpful for cases when you do not want to process very small batch sizes. HDFS sink in Flume has the concept of rolling files based on time, number of events or size. https://flume.apache.org/FlumeUserGuide.html#hdfs-sink The same could be applied to Spark if the use cases demand. The only major catch would be that it breaks the concept of window operations which are in Spark. Thanks, Jayant On Tue, Oct 7, 2014 at 10:19 PM, Michael Allman mich...@videoamp.com wrote: Hi Andrew, The use case I have in mind is batch data serialization to HDFS, where sizing files to a certain HDFS block size is desired. In my particular use case, I want to process 10GB batches of data at a time. I'm not sure this is a sensible use case for spark streaming, and I was trying to test it. However, I had trouble getting it working and in the end I decided it was more trouble than it was worth. So I decided to split my task into two: one streaming job on small, time-defined batches of data, and a traditional Spark job aggregating the smaller files into a larger whole. In retrospect, I think this is the right way to go, even if a count-based window specification was possible. Therefore, I can't suggest my use case for a count-based window size. Cheers, Michael On Oct 5, 2014, at 4:03 PM, Andrew Ash and...@andrewash.com wrote: Hi Michael, I couldn't find anything in Jira for it -- https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22window%22%20AND%20component%20%3D%20Streaming Could you or Adrian please file a Jira ticket explaining the functionality and maybe a proposed API? This will help people interested in count-based windowing to understand the state of the feature in Spark Streaming. Thanks! Andrew On Fri, Oct 3, 2014 at 4:09 PM, Michael Allman mich...@videoamp.com wrote: Hi, I also have a use for count-based windowing. I'd like to process data batches by size as opposed to time. Is this feature on the development roadmap? Is there a JIRA ticket for it? Thank you, Michael -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-tp2085p15701.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: window every n elements instead of time based
Yes, I meant batch interval. Thanks for clarifying. Cheers, Michael On Oct 7, 2014, at 11:14 PM, jayant [via Apache Spark User List] ml-node+s1001560n15904...@n3.nabble.com wrote: Hi Michael, I think you are meaning batch interval instead of windowing. It can be helpful for cases when you do not want to process very small batch sizes. HDFS sink in Flume has the concept of rolling files based on time, number of events or size. https://flume.apache.org/FlumeUserGuide.html#hdfs-sink The same could be applied to Spark if the use cases demand. The only major catch would be that it breaks the concept of window operations which are in Spark. Thanks, Jayant On Tue, Oct 7, 2014 at 10:19 PM, Michael Allman [hidden email] wrote: Hi Andrew, The use case I have in mind is batch data serialization to HDFS, where sizing files to a certain HDFS block size is desired. In my particular use case, I want to process 10GB batches of data at a time. I'm not sure this is a sensible use case for spark streaming, and I was trying to test it. However, I had trouble getting it working and in the end I decided it was more trouble than it was worth. So I decided to split my task into two: one streaming job on small, time-defined batches of data, and a traditional Spark job aggregating the smaller files into a larger whole. In retrospect, I think this is the right way to go, even if a count-based window specification was possible. Therefore, I can't suggest my use case for a count-based window size. Cheers, Michael On Oct 5, 2014, at 4:03 PM, Andrew Ash [hidden email] wrote: Hi Michael, I couldn't find anything in Jira for it -- https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22window%22%20AND%20component%20%3D%20Streaming Could you or Adrian please file a Jira ticket explaining the functionality and maybe a proposed API? This will help people interested in count-based windowing to understand the state of the feature in Spark Streaming. Thanks! Andrew On Fri, Oct 3, 2014 at 4:09 PM, Michael Allman [hidden email] wrote: Hi, I also have a use for count-based windowing. I'd like to process data batches by size as opposed to time. Is this feature on the development roadmap? Is there a JIRA ticket for it? Thank you, Michael -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-tp2085p15701.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-tp2085p15904.html To unsubscribe from window every n elements instead of time based, click here. NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-tp2085p15905.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Parsing one big multiple line .xml loaded in RDD using Python
Thank you, this seems to be the way to go, but unfortunately, when I'm trying to use sc.wholeTextFiles() on file that is stored amazon S3 I'm getting following Error: 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 Traceback (most recent call last): File /root/distributed_rdd_test.py, line 27, in module result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) File /root/spark/python/pyspark/rdd.py, line 1126, in take totalParts = self._jrdd.partitions().size() File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions. : java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) My code is following: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput') result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) for item in result: print item.getvalue() sc.stop() So my question is is it possible to read whole files from S3? Based on the documentation it shouold be possible, but it seems that it does not work for me. __ Od: Davies Liu dav...@databricks.com Komu: jan.zi...@centrum.cz Datum: 07.10.2014 17:38 Předmět: Re: Parsing one big multiple line .xml loaded in RDD using Python CC: u...@spark.incubator.apache.org Maybe sc.wholeTextFile() is what you want, you can get the whole text and parse it by yourself. On Tue, Oct 7, 2014 at 1:06 AM, jan.zi...@centrum.cz wrote: Hi, I have already unsucesfully asked quiet simmilar question at stackoverflow, particularly here: http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim. I've also unsucessfully tryied some workaround, but unsucessfuly, workaround problem can be found at http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html. Particularly what I'm trying to do, I have .xml dump of wikipedia as the input. The .xml is quite big and it spreads across multiple lines. You can check it out at http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2 http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2. My goal is to parse this .xml in a same way as gensim.corpora.wikicorpus.extract_pages do, implementation is at https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py
Re: How to do broadcast join in SparkSQL
Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged into master? I cannot find spark.sql.hints.broadcastTables in latest master, but it's in the following patch. https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 Jianshi On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Yes, looks like it can only be controlled by the parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird to me. How am I suppose to know the exact bytes of a table? Let me specify the join algorithm is preferred I think. Jianshi On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at SPARK-1800 ? e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala Cheers On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I cannot find it in the documentation. And I have a dozen dimension tables to (left) join... Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Reading from HBase is too slow
Hi Sean, Do I need to specify the number of executors when submitting the job? I suppose the number of executors will be determined by the number of regions of the table. Just like a MapReduce job, you needn't specify the number of map tasks when reading from a HBase table. The script to submit my job can be seen in my second post. Please refer to that. 2014-10-08 13:44 GMT+08:00 Sean Owen so...@cloudera.com: How did you run your program? I don't see from your earlier post that you ever asked for more executors. On Wed, Oct 8, 2014 at 4:29 AM, Tao Xiao xiaotao.cs@gmail.com wrote: I found the reason why reading HBase is too slow. Although each regionserver serves multiple regions for the table I'm reading, the number of Spark workers allocated by Yarn is too low. Actually, I could see that the table has dozens of regions spread over about 20 regionservers, but only two Spark workers are allocated by Yarn. What is worse, the two workers run one after one. So, the Spark job lost parallelism. So now the question is : Why are only 2 workers allocated?
Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin?
I'm pretty sure inner joins on Spark SQL already build only one of the sides. Take a look at ShuffledHashJoin, which calls HashJoin.joinIterators. Only outer joins do both, and it seems like we could optimize it for those that are not full. Matei On Oct 7, 2014, at 11:04 PM, Haopu Wang hw...@qilinsoft.com wrote: Liquan, yes, for full outer join, one hash table on both sides is more efficient. For the left/right outer join, it looks like one hash table should be enought. From: Liquan Pei [mailto:liquan...@gmail.com] Sent: 2014年9月30日 18:34 To: Haopu Wang Cc: d...@spark.apache.org; user Subject: Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, How about full outer join? One hash table may not be efficient for this case. Liquan On Mon, Sep 29, 2014 at 11:47 PM, Haopu Wang hw...@qilinsoft.com wrote: Hi, Liquan, thanks for the response. In your example, I think the hash table should be built on the right side, so Spark can iterate through the left side and find matches in the right side from the hash table efficiently. Please comment and suggest, thanks again! From: Liquan Pei [mailto:liquan...@gmail.com] Sent: 2014年9月30日 12:31 To: Haopu Wang Cc: d...@spark.apache.org; user Subject: Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, My understanding is that the hashtable on both left and right side is used for including null values in result in an efficient manner. If hash table is only built on one side, let's say left side and we perform a left outer join, for each row in left side, a scan over the right side is needed to make sure that no matching tuples for that row on left side. Hope this helps! Liquan On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang hw...@qilinsoft.com wrote: I take a look at HashOuterJoin and it's building a Hashtable for both sides. This consumes quite a lot of memory when the partition is big. And it doesn't reduce the iteration on streamed relation, right? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Reading from HBase is too slow
You do need to specify the number of executor cores to use. Executors are not like mappers. After all they may do much more in their lifetime than just read splits from HBase so would not make sense to determine it by something that the first line of the program does. On Oct 8, 2014 8:00 AM, Tao Xiao xiaotao.cs@gmail.com wrote: Hi Sean, Do I need to specify the number of executors when submitting the job? I suppose the number of executors will be determined by the number of regions of the table. Just like a MapReduce job, you needn't specify the number of map tasks when reading from a HBase table. The script to submit my job can be seen in my second post. Please refer to that. 2014-10-08 13:44 GMT+08:00 Sean Owen so...@cloudera.com: How did you run your program? I don't see from your earlier post that you ever asked for more executors. On Wed, Oct 8, 2014 at 4:29 AM, Tao Xiao xiaotao.cs@gmail.com wrote: I found the reason why reading HBase is too slow. Although each regionserver serves multiple regions for the table I'm reading, the number of Spark workers allocated by Yarn is too low. Actually, I could see that the table has dozens of regions spread over about 20 regionservers, but only two Spark workers are allocated by Yarn. What is worse, the two workers run one after one. So, the Spark job lost parallelism. So now the question is : Why are only 2 workers allocated?
Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin?
I am working on a PR to leverage the HashJoin trait code to optimize the Left/Right outer join. It's already been tested locally and will send out the PR soon after some clean up. Thanks, Liquan On Wed, Oct 8, 2014 at 12:09 AM, Matei Zaharia matei.zaha...@gmail.com wrote: I'm pretty sure inner joins on Spark SQL already build only one of the sides. Take a look at ShuffledHashJoin, which calls HashJoin.joinIterators. Only outer joins do both, and it seems like we could optimize it for those that are not full. Matei On Oct 7, 2014, at 11:04 PM, Haopu Wang hw...@qilinsoft.com wrote: Liquan, yes, for full outer join, one hash table on both sides is more efficient. For the left/right outer join, it looks like one hash table should be enought. -- *From:* Liquan Pei [mailto:liquan...@gmail.com liquan...@gmail.com] *Sent:* 2014年9月30日 18:34 *To:* Haopu Wang *Cc:* d...@spark.apache.org; user *Subject:* Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, How about full outer join? One hash table may not be efficient for this case. Liquan On Mon, Sep 29, 2014 at 11:47 PM, Haopu Wang hw...@qilinsoft.com wrote: Hi, Liquan, thanks for the response. In your example, I think the hash table should be built on the right side, so Spark can iterate through the left side and find matches in the right side from the hash table efficiently. Please comment and suggest, thanks again! -- *From:* Liquan Pei [mailto:liquan...@gmail.com] *Sent:* 2014年9月30日 12:31 *To:* Haopu Wang *Cc:* d...@spark.apache.org; user *Subject:* Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, My understanding is that the hashtable on both left and right side is used for including null values in result in an efficient manner. If hash table is only built on one side, let's say left side and we perform a left outer join, for each row in left side, a scan over the right side is needed to make sure that no matching tuples for that row on left side. Hope this helps! Liquan On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang hw...@qilinsoft.com wrote: I take a look at HashOuterJoin and it's building a Hashtable for both sides. This consumes quite a lot of memory when the partition is big. And it doesn't reduce the iteration on streamed relation, right? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Mosek Solver with Apache Spark
Hi, Has anyone tried Mosek http://www.mosek.com/ Solver in Spark? I getting weird serialization errors. I came to know that Mosek uses shared libraries which may not be serialized. Is this the reason that they are not serialized or Is it working for anyone. -- Regards, Raghuveer Chanda 4th year Undergraduate Student Computer Science and Engineering IIT Kharagpur
Re: Any issues with repartition?
Looks like an OOM issue? Have you tried persisting your RDDs to allow disk writes? I've seen a lot of similar crashes in a Spark app that reads from HDFS and does joins. I.e. I've seen java.io.IOException: Filesystem closed, Executor lost, FetchFailed, etc etc with non-deterministic crashes. I've tried persisting RDDs, tuning other params, and verifying that the Executor JVMs don't come close to their max allocated memory during operation. Looking through user@ tonight, there are a ton of email threads with similar crashes and no answers. It looks like a lot of people are struggling with OOMs. Could one of the Spark committers please comment on this thread, or one of the other unanswered threads with similar crashes? Is this simply how Spark behaves if Executors OOM? What can the user do other than increase memory or reduce RDD size? (And how can one deduce how much of either is needed?) One general workaround for OOMs could be to programmatically break the job input (i.e. from HDFS, input from #parallelize() ) into chunks, and only create/process RDDs related to one chunk at a time. However, this approach has the limitations of Spark Streaming and no formal library support. What might be nice is that if tasks fail, Spark could try to re-partition in order to avoid OOMs. On Fri, Oct 3, 2014 at 2:55 AM, jamborta jambo...@gmail.com wrote: I have two nodes with 96G ram 16 cores, my setup is as follows: conf = (SparkConf() .setMaster(yarn-cluster) .set(spark.executor.memory, 30G) .set(spark.cores.max, 32) .set(spark.executor.instances, 2) .set(spark.executor.cores, 8) .set(spark.akka.timeout, 1) .set(spark.akka.askTimeout, 100) .set(spark.akka.frameSize, 500) .set(spark.cleaner.ttl, 86400) .set(spark.tast.maxFailures, 16) .set(spark.worker.timeout, 150) thanks a lot, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15674.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to do broadcast join in SparkSQL
Ok, currently there's cost-based optimization however Parquet statistics is not implemented... What's the good way if I want to join a big fact table with several tiny dimension tables in Spark SQL (1.1)? I wish we can allow user hint for the join. Jianshi On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged into master? I cannot find spark.sql.hints.broadcastTables in latest master, but it's in the following patch. https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 Jianshi On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Yes, looks like it can only be controlled by the parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird to me. How am I suppose to know the exact bytes of a table? Let me specify the join algorithm is preferred I think. Jianshi On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at SPARK-1800 ? e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala Cheers On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I cannot find it in the documentation. And I have a dozen dimension tables to (left) join... Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Interactive interface tool for spark
Hi, All We need an interactive interface tool for spark in which we can run spark job and plot graph to explorer the data interactively. Ipython notebook is good, but it only support python (we want one supporting scala)... BR, Kevin.
Re: return probability \ confidence instead of actual class
ok let me rephrase my question once again. python-wise I am preferring .predict_proba(X) instead of .decision_function(X) since it is easier for me to interpret the results. as far as I can see, the latter functionality is already implemented in Spark (well, in version 0.9.2 for example I have to compute the dot product on my own otherwise I get 0 or 1) but the former is not implemented (yet!). what should I do \ how to implement that one in Spark as well? what are the required inputs here and how does the formula look like? On Tue, Oct 7, 2014 at 10:04 PM, Sean Owen so...@cloudera.com wrote: It looks like you are directly computing the SVM decision function in both cases: val predictions2 = m_users_double.map{point= point.zip(weights).map(a= a._1 * a._2).sum + intercept }.cache() clf.decision_function(T) This does not give you +1/-1 in SVMs (well... not for most points, which will be outside the margin around the separating hyperplane). You can use the predict() function in SVMModel -- which will give you 0 or 1 (rather than +/- 1 but that's just differing convention) depending on the sign of the decision function. I don't know if this was in 0.9. At the moment I assume you saw small values of the decision function in scikit because of the radial basis function. On Tue, Oct 7, 2014 at 7:45 PM, Sunny Khatri sunny.k...@gmail.com wrote: Not familiar with scikit SVM implementation ( and I assume you are using linearSVC). To figure out an optimal decision boundary based on the scores obtained, you can use an ROC curve varying your thresholds.
org/I0Itec/zkclient/serialize/ZkSerializer ClassNotFound
Hello, I have been developing a Spark Streaming application using Kafka, which runs successfully on my Macbook. I am now trying to run it on an AWS Ubuntu spark cluster... and I receive a ClassNotFoundException. Kafka 0.8.1.1 Spark 1.1.0 I am submitting the job like this: /opt/spark/bin/spark-submit --jars /opt/spark-streaming-kafka_2.10-1.1.0.jar,/opt/kafka/libs/zkclient-0.3.jar --class com.mycompanyname.Main spark-job-assembly-0.1.1.jar I have also verified that a class with that name appears in the jar using: jar -tf spark-job-assembly-0.1.1.jar | grep org/I0Itec/zkclient/serialize/ZkSerializer The full stacktrace is below. Has anyone seen this before, and if so, how do I get it to recognise that ZkSerializer is present? Thanks, Colin 14/10/08 09:18:21 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, worker2): java.lang.NoClassDefFoundError: org/I0Itec/zkclient/serialize/ZkSerializer java.lang.Class.getDeclaredFields0(Native Method) java.lang.Class.privateGetDeclaredFields(Class.java:2436) java.lang.Class.getDeclaredField(Class.java:1946) java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) java.security.AccessController.doPrivileged(Native Method) java.io.ObjectStreamClass.init(ObjectStreamClass.java:468) java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:74) sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-I0Itec-zkclient-serialize-ZkSerializer-ClassNotFound-tp15919.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark and tree data structures
Hi all, I'd like to use an octree data structure in order to simplify several computations in a big data set. I've been wondering if Spark has any built-in options for such structures (the only thing I could find is the DecisionTree), specially if they make use of RDDs. I've also been exploring the possibility of using key-value pairs to simulate a tree's structure within an RDD, but this makes the program a lot harder to understand and limits my options when processing the data. Any advice is very welcome, thanks in advance. Regards, Silvina
Re: SparkContext.wholeTextFiles() java.io.FileNotFoundException: File does not exist:
My additional question is if this problem can be possibly caused by the fact that my file is bigger than RAM memory across the whole cluster? __ Hi I'm trying to use sc.wholeTextFiles() on file that is stored amazon S3 I'm getting following Error: 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 Traceback (most recent call last): File /root/distributed_rdd_test.py, line 27, in module result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) File /root/spark/python/pyspark/rdd.py, line 1126, in take totalParts = self._jrdd.partitions().size() File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions. : java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) My code is following: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput') result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) for item in result: print item.getvalue() sc.stop() So my question is, is it possible to read whole files from S3? Based on the documentation it shouold be possible, but it seems that it does not work for me. When I do just: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput').take(10) print distData Then the error that I'm getting is exactly the same. Thank you in advance for any advice. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error reading from Kafka
Hi, I'm trying to read from Kafka. I was able to do it correctly with this method. def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] But now I have to add some params to kafka consumer so I've changed to other createStream method but I'm getting an error: 14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 *14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: scala.runtime.Nothing$.init(kafka.utils.VerifiableProperties)* at java.lang.Class.getConstructor0(Class.java:2849) at java.lang.Class.getConstructor(Class.java:1718) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *This is my code. It seems that createStream returns ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string)) so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by reflection from Nothing object and don't find the method. * *val topics = config.getString(nessus.kafka.topics)* * val numThreads = config.getInt(nessus.kafka.numThreads)* * val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap* * val kafkaParams = Map(* *zookeeper.connect - localhost:2181,* *group.id http://group.id/ - my-grp)* * val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,* *kafkaParams,* *topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)* I found that issue https://issues.apache.org/jira/browse/SPARK-2103 https://issues.apache.org/jira/browse/SPARK-2103 but it was solved and I'm using spark 1.1.0 and scala 2.10 so I don't know what happens. Any thoughts? -- http://www.stratio.com/ Avenida de Europa, 26. Ática 5. 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*
RE: MLLib Linear regression
Hi Xiangrui,Changing the default step size to 0.01 made a huge difference. The results make sense when I use A + B + C + D. MSE is ~0.07 and the outcome matches the domain knowledge. I was wondering is there any documentation on the parameters and when/how to vary them. Date: Tue, 7 Oct 2014 15:11:39 -0700 Subject: Re: MLLib Linear regression From: men...@gmail.com To: ssti...@live.com CC: user@spark.apache.org Did you test different regularization parameters and step sizes? In the combination that works, I don't see A + D. Did you test that combination? Are there any linear dependency between A's columns and D's columns? -Xiangrui On Tue, Oct 7, 2014 at 1:56 PM, Sameer Tilak ssti...@live.com wrote: BTW, one detail: When number of iterations is 100 all weights are zero or below and the indices are only from set A. When number of iterations is 150 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (5.xxx) and the result does not match the domain knowledge. When number of iterations is 400 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (6.xxx) and the result does not match the domain knowledge. Any help will be highly appreciated. From: ssti...@live.com To: user@spark.apache.org Subject: MLLib Linear regression Date: Tue, 7 Oct 2014 13:41:03 -0700 Hi All, I have following classes of features: class A: 15000 features class B: 170 features class C: 900 features Class D: 6000 features. I use linear regression (over sparse data). I get excellent results with low RMSE (~0.06) for the following combinations of classes: 1. A + B + C 2. B + C + D 3. A + B 4. A + C 5. B + D 6. C + D 7. D Unfortunately, when I use A + B + C + D (all the features) I get results that don't make any sense -- all weights are zero or below and the indices are only from set A. I also get high MSE. I changed the number of iterations from 100 to 150, 250, or even 400. I still get MSE as (5/ 6). Are there any other parameters that I can play with? Any insight on what could be wrong? Is it somehow it is not able to scale up to 22K features? (I highly doubt that). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: foreachPartition: write to multiple files
Hi, I finally found a solution after reading the post : http://apache-spark-user-list.1001560.n3.nabble.com/how-to-split-RDD-by-key-and-save-to-different-path-td11887.html#a11983 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/foreachPartition-write-to-multiple-files-tp15925p15937.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Interactive interface tool for spark
Hi Andy, This sounds awesome. Please keep us posted. Meanwhile, can you share a link to your project? I wasn't able to find it. Cheers, Michael On Oct 8, 2014, at 3:38 AM, andy petrella andy.petre...@gmail.com wrote: Heya You can check Zeppellin or my fork of the Scala notebook. I'm going this week end to push some efforts on the doc, because it supports for realtime graphing, Scala, SQL, dynamic loading of dependencies and I started this morning a widget to track the progress of the jobs. I'm quite happy with it so far, I used it with graphx, mllib, ADAM and the Cassandra connector so far. However, its major drawback is that it is a one man (best) effort ftm! :-S Le 8 oct. 2014 11:16, Dai, Kevin yun...@ebay.com a écrit : Hi, All We need an interactive interface tool for spark in which we can run spark job and plot graph to explorer the data interactively. Ipython notebook is good, but it only support python (we want one supporting scala)… BR, Kevin.
Re: How could I start new spark cluster with hadoop2.0.2
Hi, Were you able to figure out how to choose a specific version? Im having the same issue. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-start-new-spark-cluster-with-hadoop2-0-2-tp10450p15939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-Shell: OOM: GC overhead limit exceeded
Increasing the driver memory resolved this issue. Thanks to Nick for the hint. Here is how I am starting the shell: spark-shell --driver-memory 4g --driver-cores 4 --master local -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Shell-OOM-GC-overhead-limit-exceeded-tp15890p15940.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkContext.wholeTextFiles() java.io.FileNotFoundException: File does not exist:
One more update: I've realized that this problem is not only Python related. I've tried it also in Scala, but I'm still getting the same error, my scala code: val file = sc.wholeTextFiles(s3n://wiki-dump/wikiinput).first() __ My additional question is if this problem can be possibly caused by the fact that my file is bigger than RAM memory across the whole cluster? __ Hi I'm trying to use sc.wholeTextFiles() on file that is stored amazon S3 I'm getting following Error: 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 Traceback (most recent call last): File /root/distributed_rdd_test.py, line 27, in module result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) File /root/spark/python/pyspark/rdd.py, line 1126, in take totalParts = self._jrdd.partitions().size() File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions. : java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) My code is following: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput') result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) for item in result: print item.getvalue() sc.stop() So my question is, is it possible to read whole files from S3? Based on the documentation it shouold be possible, but it seems that it does not work for me. When I do just: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput').take(10) print distData Then the error that I'm getting is exactly the same. Thank you in advance for any advice. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkContext.wholeTextFiles() java.io.FileNotFoundException: File does not exist:
Take this as a bit of a guess, since I don't use S3 much and am only a bit aware of the Hadoop+S3 integration issues. But I know that S3's lack of proper directories causes a few issues when used with Hadoop, which wants to list directories. According to http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/s3native/NativeS3FileSystem.html ... I wonder if you simply need to end the path with / to make it clear you mean it as a directory. Hadoop S3 OutputFormats are going to append ..._$folder$ files to mark directories too, although I don't think it's required necessarily to read them as dirs. I still imagine there could be some problem between Hadoop in Spark in this regard, but worth trying the path thing first. You do need s3n:// for sure. On Wed, Oct 8, 2014 at 4:54 PM, jan.zi...@centrum.cz wrote: One more update: I've realized that this problem is not only Python related. I've tried it also in Scala, but I'm still getting the same error, my scala code: val file = sc.wholeTextFiles(s3n://wiki-dump/wikiinput).first() __ My additional question is if this problem can be possibly caused by the fact that my file is bigger than RAM memory across the whole cluster? __ Hi I'm trying to use sc.wholeTextFiles() on file that is stored amazon S3 I'm getting following Error: 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 Traceback (most recent call last): File /root/distributed_rdd_test.py, line 27, in module result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) File /root/spark/python/pyspark/rdd.py, line 1126, in take totalParts = self._jrdd.partitions().size() File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions. : java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) My code is following: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput') result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) for item in result: print item.getvalue() sc.stop() So my question is, is it possible to read whole files from S3? Based on the documentation it shouold be possible, but it seems that it does not work for me. When I do just: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput').take(10) print distData Then the error that I'm getting is exactly the same. Thank you in advance for any advice. - To unsubscribe, e-mail:
Re: Interactive interface tool for spark
Ummm... what's helium? Link, plz? On Oct 8, 2014, at 9:13 AM, Stephen Boesch java...@gmail.com wrote: @kevin, Michael, Second that: interested in seeing the zeppelin. pls use helium though .. 2014-10-08 7:57 GMT-07:00 Michael Allman mich...@videoamp.com: Hi Andy, This sounds awesome. Please keep us posted. Meanwhile, can you share a link to your project? I wasn't able to find it. Cheers, Michael On Oct 8, 2014, at 3:38 AM, andy petrella andy.petre...@gmail.com wrote: Heya You can check Zeppellin or my fork of the Scala notebook. I'm going this week end to push some efforts on the doc, because it supports for realtime graphing, Scala, SQL, dynamic loading of dependencies and I started this morning a widget to track the progress of the jobs. I'm quite happy with it so far, I used it with graphx, mllib, ADAM and the Cassandra connector so far. However, its major drawback is that it is a one man (best) effort ftm! :-S Le 8 oct. 2014 11:16, Dai, Kevin yun...@ebay.com a écrit : Hi, All We need an interactive interface tool for spark in which we can run spark job and plot graph to explorer the data interactively. Ipython notebook is good, but it only support python (we want one supporting scala)… BR, Kevin.
Re: spark-ec2 - HDFS doesn't start on AWS EC2 cluster
They reverted to a previous version of the spark-ec2 script and things are working again! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-HDFS-doesn-t-start-on-AWS-EC2-cluster-tp15921p15945.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Interactive interface tool for spark
Sure! I'll post updates as well in the ML :-) I'm doing it on twitter for now (until doc is ready). The repo is there (branch spark) : https://github.com/andypetrella/scala-notebook/tree/spark Some tweets: * very first working stuff: https://twitter.com/noootsab/status/508758335982927872/photo/1 * using graphx: https://twitter.com/noootsab/status/517073481104908289/photo/1 * using sql (it has already evolved in order to declare variable names): https://twitter.com/noootsab/status/518917295226515456/photo/1 * using ADAM+mllib: https://twitter.com/noootsab/status/511270449054220288/photo/1 There are plenty of others stuffs but will need some time for the write-up (soon) cheers, andy aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Wed, Oct 8, 2014 at 4:57 PM, Michael Allman mich...@videoamp.com wrote: Hi Andy, This sounds awesome. Please keep us posted. Meanwhile, can you share a link to your project? I wasn't able to find it. Cheers, Michael On Oct 8, 2014, at 3:38 AM, andy petrella andy.petre...@gmail.com wrote: Heya You can check Zeppellin or my fork of the Scala notebook. I'm going this week end to push some efforts on the doc, because it supports for realtime graphing, Scala, SQL, dynamic loading of dependencies and I started this morning a widget to track the progress of the jobs. I'm quite happy with it so far, I used it with graphx, mllib, ADAM and the Cassandra connector so far. However, its major drawback is that it is a one man (best) effort ftm! :-S Le 8 oct. 2014 11:16, Dai, Kevin yun...@ebay.com a écrit : Hi, All We need an interactive interface tool for spark in which we can run spark job and plot graph to explorer the data interactively. Ipython notebook is good, but it only support python (we want one supporting scala)… BR, Kevin.
Re: spark-ec2 - HDFS doesn't start on AWS EC2 cluster
Yup, though to be clear, Josh reverted a change to a hosted script that spark-ec2 references. The spark-ec2 script y’all are running locally hasn’t changed, obviously. On Wed, Oct 8, 2014 at 12:20 PM, mrm ma...@skimlinks.com wrote: They reverted to a previous version of the spark-ec2 script and things are working again! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-HDFS-doesn-t-start-on-AWS-EC2-cluster-tp15921p15945.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-ec2 - HDFS doesn't start on AWS EC2 cluster
Thanks for explanation, i was going to ask exactly about this :) On Wed, Oct 8, 2014 at 6:23 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Yup, though to be clear, Josh reverted a change to a hosted script that spark-ec2 references. The spark-ec2 script y’all are running locally hasn’t changed, obviously. On Wed, Oct 8, 2014 at 12:20 PM, mrm ma...@skimlinks.com wrote: They reverted to a previous version of the spark-ec2 script and things are working again! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-HDFS-doesn-t-start-on-AWS-EC2-cluster-tp15921p15945.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Jan Warchoł* *Data** Engineer* - M: +48 509 078 203 E: jan.warc...@codilime.com - CodiLime Sp. z o.o. - Ltd. company with its registered office in Poland, 01-167 Warsaw, ul. Zawiszy 14/97. Registered by The District Court for the Capital City of Warsaw, XII Commercial Department of the National Court Register. Entered into National Court Register under No. KRS 388871. Tax identification number (NIP) 5272657478. Statistical number (REGON) 142974628. - The information in this email is confidential and may be legally privileged, it may contain information that is confidential in CodiLime Sp. z o.o. It is intended solely for the addressee. Any access to this email by third parties is unauthorized. If you are not the intended recipient of this message, any disclosure, copying, distribution or any action undertaken or neglected in reliance thereon is prohibited and may result in your liability for damages.
Re: MLLib Linear regression
The proper step size partially depends on the Lipschitz constant of the objective. You should let the machine try different combinations of parameters and select the best. We are working with people from AMPLab to make hyperparameter tunning easier in MLlib 1.2. For the theory, Nesterov's book Introductory Lectures on Convex Optimization is a good one. We didn't use line search in the current implementation of LinearRegression, which we should definitely add that option in the future. Best, Xiangrui On Wed, Oct 8, 2014 at 7:21 AM, Sameer Tilak ssti...@live.com wrote: Hi Xiangrui, Changing the default step size to 0.01 made a huge difference. The results make sense when I use A + B + C + D. MSE is ~0.07 and the outcome matches the domain knowledge. I was wondering is there any documentation on the parameters and when/how to vary them. Date: Tue, 7 Oct 2014 15:11:39 -0700 Subject: Re: MLLib Linear regression From: men...@gmail.com To: ssti...@live.com CC: user@spark.apache.org Did you test different regularization parameters and step sizes? In the combination that works, I don't see A + D. Did you test that combination? Are there any linear dependency between A's columns and D's columns? -Xiangrui On Tue, Oct 7, 2014 at 1:56 PM, Sameer Tilak ssti...@live.com wrote: BTW, one detail: When number of iterations is 100 all weights are zero or below and the indices are only from set A. When number of iterations is 150 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (5.xxx) and the result does not match the domain knowledge. When number of iterations is 400 I see 30+ non-zero weights (when sorted by weight) and indices are distributed across al sets. however MSE is high (6.xxx) and the result does not match the domain knowledge. Any help will be highly appreciated. From: ssti...@live.com To: user@spark.apache.org Subject: MLLib Linear regression Date: Tue, 7 Oct 2014 13:41:03 -0700 Hi All, I have following classes of features: class A: 15000 features class B: 170 features class C: 900 features Class D: 6000 features. I use linear regression (over sparse data). I get excellent results with low RMSE (~0.06) for the following combinations of classes: 1. A + B + C 2. B + C + D 3. A + B 4. A + C 5. B + D 6. C + D 7. D Unfortunately, when I use A + B + C + D (all the features) I get results that don't make any sense -- all weights are zero or below and the indices are only from set A. I also get high MSE. I changed the number of iterations from 100 to 150, 250, or even 400. I still get MSE as (5/ 6). Are there any other parameters that I can play with? Any insight on what could be wrong? Is it somehow it is not able to scale up to 22K features? (I highly doubt that). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL HiveContext Projection Pushdown
We have our analytics infra built on Spark and Parquet. We are trying to replace some of our queries based on the direct Spark RDD API to SQL based either on Spark SQL/HiveQL. Our motivation was to take advantage of the transparent projection predicate pushdown that's offered by Spark SQL and eliminate the need for persisting the RDD in memory. (Cache invalidation turned out to be a big problem for us) The below tests are done with Spark 1.1.0 on CDH 5.1.0 1. Spark SQL's (SQLContext) Parquet support was excellent for our case. The ability to query in SQL and apply scala functions as UDFs in the SQL is extremely convenient. Project pushdown works flawlessly, not much sure about predicate pushdown (we have 90% optional fields in our dataset and I remember Michael Armbrust telling me that this is a bug in Parquet in that it doesnt allow predicate pushdown for optional fields.) However we have timestamp based duplicate removal which requires windowing queries which are not working in SQLContext.sql parsing mode. 2. We then tried HiveQL using HiveContext by creating a Hive external table backed by the same Parquet data. However, in this mode, projection pushdown doesnt seem to work and it ends up reading the whole Parquet data for each query.(which slows down a lot) Please see attached the screenshot of this. Hive itself doesnt seem to have any issues with the projection pushdown. So this is weird. Is this due to any configuration problem? Thanks in advance, Anand Mohan SparkSQLHiveParquet.png (316K) http://apache-spark-user-list.1001560.n3.nabble.com/attachment/15953/0/SparkSQLHiveParquet.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-HiveContext-Projection-Pushdown-tp15953.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
meetup october 30-31st in SF
Hi all, We’re organizing a meetup October 30-31st in downtown SF that might be of interest to the Spark community. The focus is on large-scale data analysis and its role in neuroscience. It will feature several active Spark developers and users, including Xiangrui Meng, Josh Rosen, Reza Zadeh, and Sandy Ryza. You can sign up here (registration is free): https://www.eventbrite.com/e/codeneuro-tickets-13197330571?discount=spark Hope some of you can make it! — Jeremy -- jeremyfreeman.net @thefreemanlab
Broadcast Torrent fail - then the job dies
I am running on Windows 8 using Spark 1.1.0 in local mode with Hadoop 2.2 - I repeatedly see the following in my logs. I believe this happens in combineByKey 14/10/08 09:36:30 INFO executor.Executor: Running task 3.0 in stage 0.0 (TID 3) 14/10/08 09:36:30 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR broadcast.TorrentBroadcast: Reading broadcast variable 0 failed 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 5.006378813 s 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:54) -
Re: Spark SQL HiveContext Projection Pushdown
We are working to improve the integration here, but I can recommend the following when running spark 1.1: create an external table and set spark.sql.hive.convertMetastoreParquet=true Note that even with a HiveContext we don't support window functions yet. On Wed, Oct 8, 2014 at 10:41 AM, Anand Mohan chinn...@gmail.com wrote: We have our analytics infra built on Spark and Parquet. We are trying to replace some of our queries based on the direct Spark RDD API to SQL based either on Spark SQL/HiveQL. Our motivation was to take advantage of the transparent projection predicate pushdown that's offered by Spark SQL and eliminate the need for persisting the RDD in memory. (Cache invalidation turned out to be a big problem for us) The below tests are done with Spark 1.1.0 on CDH 5.1.0 1. Spark SQL's (SQLContext) Parquet support was excellent for our case. The ability to query in SQL and apply scala functions as UDFs in the SQL is extremely convenient. Project pushdown works flawlessly, not much sure about predicate pushdown (we have 90% optional fields in our dataset and I remember Michael Armbrust telling me that this is a bug in Parquet in that it doesnt allow predicate pushdown for optional fields.) However we have timestamp based duplicate removal which requires windowing queries which are not working in SQLContext.sql parsing mode. 2. We then tried HiveQL using HiveContext by creating a Hive external table backed by the same Parquet data. However, in this mode, projection pushdown doesnt seem to work and it ends up reading the whole Parquet data for each query.(which slows down a lot) Please see attached the screenshot of this. Hive itself doesnt seem to have any issues with the projection pushdown. So this is weird. Is this due to any configuration problem? Thanks in advance, Anand Mohan *SparkSQLHiveParquet.png* (316K) Download Attachment http://apache-spark-user-list.1001560.n3.nabble.com/attachment/15953/0/SparkSQLHiveParquet.png -- View this message in context: Spark SQL HiveContext Projection Pushdown http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-HiveContext-Projection-Pushdown-tp15953.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Broadcast Torrent fail - then the job dies
Hi Lewis, For debugging purpose, can you try using HttpBroadCast to see if the error remains? You can enable HttpBroadCast by setting spark.broadcast.factory to org.apache.spark.broadcast.HttpBroadcastFactory in spark conf. Thanks, Liquan On Wed, Oct 8, 2014 at 11:21 AM, Steve Lewis lordjoe2...@gmail.com wrote: I am running on Windows 8 using Spark 1.1.0 in local mode with Hadoop 2.2 - I repeatedly see the following in my logs. I believe this happens in combineByKey 14/10/08 09:36:30 INFO executor.Executor: Running task 3.0 in stage 0.0 (TID 3) 14/10/08 09:36:30 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR broadcast.TorrentBroadcast: Reading broadcast variable 0 failed 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 5.006378813 s 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:54) - -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: How to do broadcast join in SparkSQL
Thanks for the input. We purposefully made sure that the config option did not make it into a release as it is not something that we are willing to support long term. That said we'll try and make this easier in the future either through hints or better support for statistics. In this particular case you can get what you want by registering the tables as external tables and setting an flag. Here's a helper function to do what you need. /** * Sugar for creating a Hive external table from a parquet path. */ def createParquetTable(name: String, file: String): Unit = { import org.apache.spark.sql.hive.HiveMetastoreTypes val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s${f.name} ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl = s |CREATE EXTERNAL TABLE $name ( | $schema |) |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' |LOCATION '$file'.stripMargin sql(ddl) setConf(spark.sql.hive.convertMetastoreParquet, true) } You'll also need to run this to populate the statistics: ANALYZE TABLE tableName COMPUTE STATISTICS noscan; On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Ok, currently there's cost-based optimization however Parquet statistics is not implemented... What's the good way if I want to join a big fact table with several tiny dimension tables in Spark SQL (1.1)? I wish we can allow user hint for the join. Jianshi On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged into master? I cannot find spark.sql.hints.broadcastTables in latest master, but it's in the following patch. https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 Jianshi On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Yes, looks like it can only be controlled by the parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird to me. How am I suppose to know the exact bytes of a table? Let me specify the join algorithm is preferred I think. Jianshi On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at SPARK-1800 ? e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala Cheers On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I cannot find it in the documentation. And I have a dozen dimension tables to (left) join... Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?
Using a var for RDDs in this way is not going to work. In this example, tx1.zip(tx2) would create and RDD that depends on tx2, but then soon after that, you change what tx2 means, so you would end up having a circular dependency. On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: My job is not being fault-tolerant (e.g., when there's a fetch failure or something). The lineage of RDDs are constantly updated every iteration. However, I think that when there's a failure, the lineage information is not being correctly reapplied. It goes something like this: val rawRDD = read(...) val repartRDD = rawRDD.repartition(X) val tx1 = repartRDD.map(...) var tx2 = tx1.map(...) while (...) { tx2 = tx1.zip(tx2).map(...) } Is there any way to monitor RDD's lineage, maybe even including? I want to make sure that there's no unexpected things happening.
Spark on YARN driver memory allocation bug?
So, I think this is a bug, but I wanted to get some feedback before I reported it as such. On Spark on YARN, 1.1.0, if you specify the --driver-memory value to be higher than the memory available on the client machine, Spark errors out due to failing to allocate enough memory. This happens even in yarn-cluster mode. Shouldn't it only allocate that memory on the YARN node that is going to run the driver process, not the local client machine? Greg
Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?
There is a toDebugString method in rdd that will print a description of this RDD and its recursive dependencies for debugging. Thanks, Liquan On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: My job is not being fault-tolerant (e.g., when there's a fetch failure or something). The lineage of RDDs are constantly updated every iteration. However, I think that when there's a failure, the lineage information is not being correctly reapplied. It goes something like this: val rawRDD = read(...) val repartRDD = rawRDD.repartition(X) val tx1 = repartRDD.map(...) var tx2 = tx1.map(...) while (...) { tx2 = tx1.zip(tx2).map(...) } Is there any way to monitor RDD's lineage, maybe even including? I want to make sure that there's no unexpected things happening. -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Support for Parquet V2 in ParquetTableSupport?
Thats a good question, I'm not sure if that will work. I will note that we are hoping to do some upgrades of our parquet support in the near future. On Tue, Oct 7, 2014 at 10:33 PM, Michael Allman mich...@videoamp.com wrote: Hello, I was interested in testing Parquet V2 with Spark SQL, but noticed after some investigation that the parquet writer that Spark SQL uses is fixed at V1 here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala#L350. Any particular reason Spark SQL is hard-coded to write Parquet V1? Should I expect trouble if I write Parquet V2? Cheers, Michael - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?
There is no circular dependency. Its simply dropping references to prev RDDs because there is no need for it. I wonder if that messes up things up though internally for Spark due to losing references to intermediate RDDs. On Oct 8, 2014, at 12:13 PM, Akshat Aranya aara...@gmail.com wrote: Using a var for RDDs in this way is not going to work. In this example, tx1.zip(tx2) would create and RDD that depends on tx2, but then soon after that, you change what tx2 means, so you would end up having a circular dependency. On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: My job is not being fault-tolerant (e.g., when there's a fetch failure or something). The lineage of RDDs are constantly updated every iteration. However, I think that when there's a failure, the lineage information is not being correctly reapplied. It goes something like this: val rawRDD = read(...) val repartRDD = rawRDD.repartition(X) val tx1 = repartRDD.map(...) var tx2 = tx1.map(...) while (...) { tx2 = tx1.zip(tx2).map(...) } Is there any way to monitor RDD's lineage, maybe even including? I want to make sure that there's no unexpected things happening.
Re: spark-ec2 - HDFS doesn't start on AWS EC2 cluster
Revert the script to an older version. https://github.com/apache/spark/tree/branch-1.1/ec2 Thanks Best Regards On Wed, Oct 8, 2014 at 9:57 PM, Jan Warchoł jan.warc...@codilime.com wrote: Thanks for explanation, i was going to ask exactly about this :) On Wed, Oct 8, 2014 at 6:23 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Yup, though to be clear, Josh reverted a change to a hosted script that spark-ec2 references. The spark-ec2 script y’all are running locally hasn’t changed, obviously. On Wed, Oct 8, 2014 at 12:20 PM, mrm ma...@skimlinks.com wrote: They reverted to a previous version of the spark-ec2 script and things are working again! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-HDFS-doesn-t-start-on-AWS-EC2-cluster-tp15921p15945.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Jan Warchoł* *Data** Engineer* - M: +48 509 078 203 E: jan.warc...@codilime.com - CodiLime Sp. z o.o. - Ltd. company with its registered office in Poland, 01-167 Warsaw, ul. Zawiszy 14/97. Registered by The District Court for the Capital City of Warsaw, XII Commercial Department of the National Court Register. Entered into National Court Register under No. KRS 388871. Tax identification number (NIP) 5272657478. Statistical number (REGON) 142974628. - The information in this email is confidential and may be legally privileged, it may contain information that is confidential in CodiLime Sp. z o.o. It is intended solely for the addressee. Any access to this email by third parties is unauthorized. If you are not the intended recipient of this message, any disclosure, copying, distribution or any action undertaken or neglected in reliance thereon is prohibited and may result in your liability for damages.
Dedup
I need to do deduplication processing in Spark. The current plan is to generate a tuple where key is the dedup criteria and value is the original input. I am thinking to use reduceByKey to discard duplicate values. If I do that, can I simply return the first argument or should I return a copy of the first argument. Is there are better way to do dedup in Spark? -Yao
Re: Dedup
Multiple values may be different, yet still be considered duplicates depending on how the dedup criteria is selected. Is that correct? Do you care in that case what value you select for a given key? On Wed, Oct 8, 2014 at 3:37 PM, Ge, Yao (Y.) y...@ford.com wrote: I need to do deduplication processing in Spark. The current plan is to generate a tuple where key is the dedup criteria and value is the original input. I am thinking to use reduceByKey to discard duplicate values. If I do that, can I simply return the first argument or should I return a copy of the first argument. Is there are better way to do dedup in Spark? -Yao
Re: Spark Monitoring with Ganglia
Hi, If using Ganglia is not an absolute requirement, check out SPM http://sematext.com/spm/ for Spark -- http://blog.sematext.com/2014/10/07/apache-spark-monitoring/ It monitors all Spark metrics (i.e. you don't need to figure out what you need to monitor, how to get it, how to graph it, etc.) and has alerts and anomaly detection built in.. If you use Spark with Hadoop, Kafka, Cassandra, HBase, Elasticsearch SPM monitors them, too, so you can have visibility into all your tech in one place. You can send Spark event logs to Logsene http://sematext.com/logsene/, too, if you want, and then you can have your performance and log graphs side by side. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Wed, Oct 1, 2014 at 4:30 PM, danilopds danilob...@gmail.com wrote: Hi, I need monitoring some aspects about my cluster like network and resources. Ganglia looks like a good option for what I need. Then, I found out that Spark has support to Ganglia. On the Spark monitoring webpage there is this information: To install the GangliaSink you’ll need to perform a custom build of Spark. I found in my Spark the directory: /extras/spark-ganglia-lgpl. But I don't know how to install it. How can I install the Ganglia to monitoring Spark cluster? How I do this custom build? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Monitoring-with-Ganglia-tp15538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on YARN driver memory allocation bug?
Hi Greg, It does seem like a bug. What is the particular exception message that you see? Andrew 2014-10-08 12:12 GMT-07:00 Greg Hill greg.h...@rackspace.com: So, I think this is a bug, but I wanted to get some feedback before I reported it as such. On Spark on YARN, 1.1.0, if you specify the --driver-memory value to be higher than the memory available on the client machine, Spark errors out due to failing to allocate enough memory. This happens even in yarn-cluster mode. Shouldn't it only allocate that memory on the YARN node that is going to run the driver process, not the local client machine? Greg
Re: Running Spark cluster on local machine, cannot connect to master error
Theodore, did you ever get this resolved? I just ran into the same thing. Before digging, I figured I'd ask. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-cluster-on-local-machine-cannot-connect-to-master-error-tp12743p15972.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark cluster on local machine, cannot connect to master error
Hi Russell and Theodore, This usually means your Master / Workers / client machine are running different versions of Spark. On a local machine, you may want to restart your master and workers (sbin/stop-all.sh, then sbin/start-all.sh). On a real cluster, you want to make sure that every node (including the submit client) has the same Spark assembly jar before restarting the master and the workers. Let me know if that fixes it. -Andrew 2014-10-08 13:29 GMT-07:00 rrussell25 rrussel...@gmail.com: Theodore, did you ever get this resolved? I just ran into the same thing. Before digging, I figured I'd ask. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-cluster-on-local-machine-cannot-connect-to-master-error-tp12743p15972.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: executors not created yarn-cluster mode
Hi Jamborta, It could be that your executors are requesting too much memory. I'm not sure why it works in client mode but not in cluster mode, however. Have you checked the RM logs for messages that complain about container memory requested being too high? How much memory is each of your container asking for (check AM logs for this)? As a sanity check you could try lowering the executor memory to see if that's in fact the issue. Another factor at play here is spark.yarn.executor.memoryOverhead, which adds to the amount of memory requested and is 348MB by default. Let me know if you find anything, -Andrew 2014-10-08 12:00 GMT-07:00 jamborta jambo...@gmail.com: Hi all, I have a setup that works fine in yarn-client mode, but when I change that to yarn-cluster, the executors don't get created, apart from the driver (it seems that it does not even appear in yarn's resource manager). There is nothing in the spark logs, either (even when debug is enabled). When I try to submit something to the sparkcontext I get the following error: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Just wondering where can I find the logs that show the executor creation process? I looked at yarn logs, also the spark event logs, couldn't find anything. many thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/executors-not-created-yarn-cluster-mode-tp15957.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
protobuf error running spark on hadoop 2.4
Hi: I tried to build Spark (1.1.0) with hadoop 2.4.0, and ran a simple wordcount example using spark_shell on mesos. When I ran my application, I got following error that looks related to the mismatch of protobuf versions between hadoop cluster (protobuf 2.5) and spark (protobuf 4.1). I ran mvn dependency:tree -Dincludes=*protobuf*, and found that zkka pulled in this protobuf 4.1 Have anyone seen this problem before ? Thanks. Error when running spark on hadoop 2.4.0 *java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$AppendRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;* mvn dependency:tree -Dincludes=*protobuf* ... *[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ spark-core_2.10 ---[INFO] org.apache.spark:spark-core_2.10:jar:1.1.0[INFO] \- org.spark-project.akka:akka-remote_2.10:jar:2.2.3-shaded-protobuf:compile[INFO] \- org.spark-project.protobuf:protobuf-java:jar:2.4.1-shaded:compile*
Re: executors not created yarn-cluster mode
hi Andrew, Thanks for the reply, I tried to tune the memory, changed it as low as possible, no luck. My guess is that this issue is related to what is discussed here http://apache-spark-user-list.1001560.n3.nabble.com/Initial-job-has-not-accepted-any-resources-td11668.html that is the executors cannot connect back to the driver (in my case I am not sure if they are even started). I could not find a way to debug, as the log files don't have any error in them. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/executors-not-created-yarn-cluster-mode-tp15957p15976.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: protobuf error running spark on hadoop 2.4
I have faced this in the past and I have to put a profile -Phadoop2.3... mvn -Dhadoop.version=2.3.0-cdh5.1.0 -Phadoop-2.3 -Pyarn -DskipTests install On Wed, Oct 8, 2014 at 1:40 PM, Chuang Liu liuchuan...@gmail.com wrote: Hi: I tried to build Spark (1.1.0) with hadoop 2.4.0, and ran a simple wordcount example using spark_shell on mesos. When I ran my application, I got following error that looks related to the mismatch of protobuf versions between hadoop cluster (protobuf 2.5) and spark (protobuf 4.1). I ran mvn dependency:tree -Dincludes=*protobuf*, and found that zkka pulled in this protobuf 4.1 Have anyone seen this problem before ? Thanks. Error when running spark on hadoop 2.4.0 *java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$AppendRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;* mvn dependency:tree -Dincludes=*protobuf* ... *[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ spark-core_2.10 ---[INFO] org.apache.spark:spark-core_2.10:jar:1.1.0[INFO] \- org.spark-project.akka:akka-remote_2.10:jar:2.2.3-shaded-protobuf:compile[INFO] \- org.spark-project.protobuf:protobuf-java:jar:2.4.1-shaded:compile*
Building pyspark with maven?
The build instructions for pyspark appear to be: sbt/sbt assembly Given that maven is the preferred build tool since July 1, presumably I have overlooked the instructions for building via maven? Anyone please point it out? thanks
Re: Dedup
Maybe you could implement something like this (i don't know if something similar already exists in spark): http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf Best, Flavio On Oct 8, 2014 9:58 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Multiple values may be different, yet still be considered duplicates depending on how the dedup criteria is selected. Is that correct? Do you care in that case what value you select for a given key? On Wed, Oct 8, 2014 at 3:37 PM, Ge, Yao (Y.) y...@ford.com wrote: I need to do deduplication processing in Spark. The current plan is to generate a tuple where key is the dedup criteria and value is the original input. I am thinking to use reduceByKey to discard duplicate values. If I do that, can I simply return the first argument or should I return a copy of the first argument. Is there are better way to do dedup in Spark? -Yao
Re: Building pyspark with maven?
Looking more closely, inside the core/pom.xml there are a few references to the python build This question mostly has to do with my limited of knowledge of python environment . I will look up how to set up python module. It appears a hack is to add $SPARK_HOME/core/target/scala-version/classes to the python module search path. 2014-10-08 14:01 GMT-07:00 Stephen Boesch java...@gmail.com: The build instructions for pyspark appear to be: sbt/sbt assembly Given that maven is the preferred build tool since July 1, presumably I have overlooked the instructions for building via maven? Anyone please point it out? thanks
Re: Support for Parquet V2 in ParquetTableSupport?
I am planning to try upgrading spark sql to a newer version of parquet, too. I'll let you know if I make progress. Thanks, Michael On Oct 8, 2014, at 12:17 PM, Michael Armbrust mich...@databricks.com wrote: Thats a good question, I'm not sure if that will work. I will note that we are hoping to do some upgrades of our parquet support in the near future. On Tue, Oct 7, 2014 at 10:33 PM, Michael Allman mich...@videoamp.com wrote: Hello, I was interested in testing Parquet V2 with Spark SQL, but noticed after some investigation that the parquet writer that Spark SQL uses is fixed at V1 here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala#L350. Any particular reason Spark SQL is hard-coded to write Parquet V1? Should I expect trouble if I write Parquet V2? Cheers, Michael - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Interactive interface tool for spark
Hi, Please check Zeppelin, too. http://zeppelin-project.org https://github.com/nflabs/zeppelin Which is similar to scala notebook. Best, moon 2014년 10월 9일 목요일, andy petrellaandy.petre...@gmail.com님이 작성한 메시지: Sure! I'll post updates as well in the ML :-) I'm doing it on twitter for now (until doc is ready). The repo is there (branch spark) : https://github.com/andypetrella/scala-notebook/tree/spark Some tweets: * very first working stuff: https://twitter.com/noootsab/status/508758335982927872/photo/1 * using graphx: https://twitter.com/noootsab/status/517073481104908289/photo/1 * using sql (it has already evolved in order to declare variable names): https://twitter.com/noootsab/status/518917295226515456/photo/1 * using ADAM+mllib: https://twitter.com/noootsab/status/511270449054220288/photo/1 There are plenty of others stuffs but will need some time for the write-up (soon) cheers, andy aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Wed, Oct 8, 2014 at 4:57 PM, Michael Allman mich...@videoamp.com javascript:_e(%7B%7D,'cvml','mich...@videoamp.com'); wrote: Hi Andy, This sounds awesome. Please keep us posted. Meanwhile, can you share a link to your project? I wasn't able to find it. Cheers, Michael On Oct 8, 2014, at 3:38 AM, andy petrella andy.petre...@gmail.com javascript:_e(%7B%7D,'cvml','andy.petre...@gmail.com'); wrote: Heya You can check Zeppellin or my fork of the Scala notebook. I'm going this week end to push some efforts on the doc, because it supports for realtime graphing, Scala, SQL, dynamic loading of dependencies and I started this morning a widget to track the progress of the jobs. I'm quite happy with it so far, I used it with graphx, mllib, ADAM and the Cassandra connector so far. However, its major drawback is that it is a one man (best) effort ftm! :-S Le 8 oct. 2014 11:16, Dai, Kevin yun...@ebay.com javascript:_e(%7B%7D,'cvml','yun...@ebay.com'); a écrit : Hi, All We need an interactive interface tool for spark in which we can run spark job and plot graph to explorer the data interactively. Ipython notebook is good, but it only support python (we want one supporting scala)… BR, Kevin.
Re: Building pyspark with maven?
Have you looked at http://spark.apache.org/docs/latest/building-with-maven.html ? Especially http://spark.apache.org/docs/latest/building-with-maven.html#building-for-pyspark-on-yarn Cheers On Wed, Oct 8, 2014 at 2:01 PM, Stephen Boesch java...@gmail.com wrote: The build instructions for pyspark appear to be: sbt/sbt assembly Given that maven is the preferred build tool since July 1, presumably I have overlooked the instructions for building via maven? Anyone please point it out? thanks
Re: Broadcast Torrent fail - then the job dies
That converts the error to the following 14/10/08 13:27:40 INFO executor.Executor: Running task 3.0 in stage 0.0 (TID 3) 14/10/08 13:27:40 INFO broadcast.HttpBroadcast: Started reading broadcast variable 0 14/10/08 13:27:40 ERROR executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.io.FileNotFoundException: http://192.168.1.4:54221/broadcast_0 at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1610) at org.apache.spark.broadcast.HttpBroadcast$.org$apache$spark$broadcast$HttpBroadcast$$read(HttpBroadcast.scala:197) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:991) Curiously the error is very repeatable on a relatively large and complex program I am running but the same Spark steps work well when the Objects are Strings and Integers like word count. My objects are complex but Serialize well and run when I drop a combineByKey step On Wed, Oct 8, 2014 at 12:00 PM, Liquan Pei liquan...@gmail.com wrote: Hi Lewis, For debugging purpose, can you try using HttpBroadCast to see if the error remains? You can enable HttpBroadCast by setting spark.broadcast.factory to org.apache.spark.broadcast.HttpBroadcastFactory in spark conf. Thanks, Liquan On Wed, Oct 8, 2014 at 11:21 AM, Steve Lewis lordjoe2...@gmail.com wrote: I am running on Windows 8 using Spark 1.1.0 in local mode with Hadoop 2.2 - I repeatedly see the following in my logs. I believe this happens in combineByKey 14/10/08 09:36:30 INFO executor.Executor: Running task 3.0 in stage 0.0 (TID 3) 14/10/08 09:36:30 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR broadcast.TorrentBroadcast: Reading broadcast variable 0 failed 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 5.006378813 s 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:54) - -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?
One thing I didn't mention is that we actually do data.repartition before hand with shuffle. I found that this can actually introduce randomness to lineage steps, because data get shuffled to different partitions and lead to inconsistent behavior if your algorithm is dependent on the order at which the data rows appear, because now data rows will appear in a different orders. If you want to guarantee fault-tolerance, you can't have any randomness whatsoever in lineage steps, and repartition violates that (depending on what you do with the data). On Wed, Oct 8, 2014 at 12:24 PM, Sung Hwan Chung coded...@gmail.com wrote: There is no circular dependency. Its simply dropping references to prev RDDs because there is no need for it. I wonder if that messes up things up though internally for Spark due to losing references to intermediate RDDs. On Oct 8, 2014, at 12:13 PM, Akshat Aranya aara...@gmail.com wrote: Using a var for RDDs in this way is not going to work. In this example, tx1.zip(tx2) would create and RDD that depends on tx2, but then soon after that, you change what tx2 means, so you would end up having a circular dependency. On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: My job is not being fault-tolerant (e.g., when there's a fetch failure or something). The lineage of RDDs are constantly updated every iteration. However, I think that when there's a failure, the lineage information is not being correctly reapplied. It goes something like this: val rawRDD = read(...) val repartRDD = rawRDD.repartition(X) val tx1 = repartRDD.map(...) var tx2 = tx1.map(...) while (...) { tx2 = tx1.zip(tx2).map(...) } Is there any way to monitor RDD's lineage, maybe even including? I want to make sure that there's no unexpected things happening.
coalesce with shuffle or repartition is not necessarily fault-tolerant
I noticed that repartition will result in non-deterministic lineage because it'll result in changed orders for rows. So for instance, if you do things like: val data = read(...) val k = data.repartition(5) val h = k.repartition(5) It seems that this results in different ordering of rows for 'k' each time you call it. And because of this different ordering, 'h' will result in different partitions even, because 'repartition' distributes through a random number generator with the 'index' as the key.
Re: Interactive interface tool for spark
Hi Andy, It sounds great! Quick questions: I have been using IPython + PySpark. I crunch the data by PySpark and then visualize the data by Python libraries like matplotlib and basemap. Could I still use these Python libraries in the Scala Notebook? If not, what is suggested approaches for visualization there? Thanks. Kelvin On Wed, Oct 8, 2014 at 9:14 AM, andy petrella andy.petre...@gmail.com wrote: Sure! I'll post updates as well in the ML :-) I'm doing it on twitter for now (until doc is ready). The repo is there (branch spark) : https://github.com/andypetrella/scala-notebook/tree/spark Some tweets: * very first working stuff: https://twitter.com/noootsab/status/508758335982927872/photo/1 * using graphx: https://twitter.com/noootsab/status/517073481104908289/photo/1 * using sql (it has already evolved in order to declare variable names): https://twitter.com/noootsab/status/518917295226515456/photo/1 * using ADAM+mllib: https://twitter.com/noootsab/status/511270449054220288/photo/1 There are plenty of others stuffs but will need some time for the write-up (soon) cheers, andy aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Wed, Oct 8, 2014 at 4:57 PM, Michael Allman mich...@videoamp.com wrote: Hi Andy, This sounds awesome. Please keep us posted. Meanwhile, can you share a link to your project? I wasn't able to find it. Cheers, Michael On Oct 8, 2014, at 3:38 AM, andy petrella andy.petre...@gmail.com wrote: Heya You can check Zeppellin or my fork of the Scala notebook. I'm going this week end to push some efforts on the doc, because it supports for realtime graphing, Scala, SQL, dynamic loading of dependencies and I started this morning a widget to track the progress of the jobs. I'm quite happy with it so far, I used it with graphx, mllib, ADAM and the Cassandra connector so far. However, its major drawback is that it is a one man (best) effort ftm! :-S Le 8 oct. 2014 11:16, Dai, Kevin yun...@ebay.com a écrit : Hi, All We need an interactive interface tool for spark in which we can run spark job and plot graph to explorer the data interactively. Ipython notebook is good, but it only support python (we want one supporting scala)… BR, Kevin.
Spark-SQL: SchemaRDD - ClassCastException
Hi I am in the process of migrating some logic in pig scripts to Spark-SQL. As part of this process, I am creating a few Select...Group By query and registering them as tables using the SchemaRDD.registerAsTable feature. When using such a registered table in a subsequent Select...Group By query, I get a ClassCastException. java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer This happens when I use the Sum function on one of the columns. Is there anyway to specify the data type for the columns when the registerAsTable function is called? Are there other approaches that I should be looking at? Thanks for your help. - Ranga
Re: Spark-SQL: SchemaRDD - ClassCastException
Using SUM on a string should automatically cast the column. Also you can use CAST to change the datatype https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-TypeConversionFunctions . What version of Spark are you running? This could be https://issues.apache.org/jira/browse/SPARK-1994 On Wed, Oct 8, 2014 at 3:47 PM, Ranga sra...@gmail.com wrote: Hi I am in the process of migrating some logic in pig scripts to Spark-SQL. As part of this process, I am creating a few Select...Group By query and registering them as tables using the SchemaRDD.registerAsTable feature. When using such a registered table in a subsequent Select...Group By query, I get a ClassCastException. java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer This happens when I use the Sum function on one of the columns. Is there anyway to specify the data type for the columns when the registerAsTable function is called? Are there other approaches that I should be looking at? Thanks for your help. - Ranga
Re: GroupBy Key and then sort values with the group
Sean, I am having a similar issue, but I have a lot of data for a group I cannot materialize the iterable into a List or Seq in memory. [I tried it runs into OOM]. is there any other way to do this ? I also tried a secondary-sort, with the key having the group::time, but the problem with that is the same group-name ends up in multiple partitions I am having to run sortByKey with one partition - sortByKey(true, 1) which shuffles a lot of data.. Thanks, -C -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GroupBy-Key-and-then-sort-values-with-the-group-tp14455p15990.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-SQL: SchemaRDD - ClassCastException
Thanks Michael. Should the cast be done in the source RDD or while doing the SUM? To give a better picture here is the code sequence: val sourceRdd = sql(select ... from source-hive-table) sourceRdd.registerAsTable(sourceRDD) val aggRdd = sql(select c1, c2, sum(c3) from sourceRDD group by c1, c2) // This query throws the exception when I collect the results I tried adding the cast to the aggRdd query above and that didn't help. - Ranga On Wed, Oct 8, 2014 at 3:52 PM, Michael Armbrust mich...@databricks.com wrote: Using SUM on a string should automatically cast the column. Also you can use CAST to change the datatype https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-TypeConversionFunctions . What version of Spark are you running? This could be https://issues.apache.org/jira/browse/SPARK-1994 On Wed, Oct 8, 2014 at 3:47 PM, Ranga sra...@gmail.com wrote: Hi I am in the process of migrating some logic in pig scripts to Spark-SQL. As part of this process, I am creating a few Select...Group By query and registering them as tables using the SchemaRDD.registerAsTable feature. When using such a registered table in a subsequent Select...Group By query, I get a ClassCastException. java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer This happens when I use the Sum function on one of the columns. Is there anyway to specify the data type for the columns when the registerAsTable function is called? Are there other approaches that I should be looking at? Thanks for your help. - Ranga
Re: Spark-SQL: SchemaRDD - ClassCastException
Which version of Spark are you running? On Wed, Oct 8, 2014 at 4:18 PM, Ranga sra...@gmail.com wrote: Thanks Michael. Should the cast be done in the source RDD or while doing the SUM? To give a better picture here is the code sequence: val sourceRdd = sql(select ... from source-hive-table) sourceRdd.registerAsTable(sourceRDD) val aggRdd = sql(select c1, c2, sum(c3) from sourceRDD group by c1, c2) // This query throws the exception when I collect the results I tried adding the cast to the aggRdd query above and that didn't help. - Ranga On Wed, Oct 8, 2014 at 3:52 PM, Michael Armbrust mich...@databricks.com wrote: Using SUM on a string should automatically cast the column. Also you can use CAST to change the datatype https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-TypeConversionFunctions . What version of Spark are you running? This could be https://issues.apache.org/jira/browse/SPARK-1994 On Wed, Oct 8, 2014 at 3:47 PM, Ranga sra...@gmail.com wrote: Hi I am in the process of migrating some logic in pig scripts to Spark-SQL. As part of this process, I am creating a few Select...Group By query and registering them as tables using the SchemaRDD.registerAsTable feature. When using such a registered table in a subsequent Select...Group By query, I get a ClassCastException. java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer This happens when I use the Sum function on one of the columns. Is there anyway to specify the data type for the columns when the registerAsTable function is called? Are there other approaches that I should be looking at? Thanks for your help. - Ranga
Re: Spark-SQL: SchemaRDD - ClassCastException
Sorry. Its 1.1.0. After digging a bit more into this, it seems like the OpenCSV Deseralizer converts all the columns to a String type. This maybe throwing the execution off. Planning to create a class and map the rows to this custom class. Will keep this thread updated. On Wed, Oct 8, 2014 at 5:11 PM, Michael Armbrust mich...@databricks.com wrote: Which version of Spark are you running? On Wed, Oct 8, 2014 at 4:18 PM, Ranga sra...@gmail.com wrote: Thanks Michael. Should the cast be done in the source RDD or while doing the SUM? To give a better picture here is the code sequence: val sourceRdd = sql(select ... from source-hive-table) sourceRdd.registerAsTable(sourceRDD) val aggRdd = sql(select c1, c2, sum(c3) from sourceRDD group by c1, c2) // This query throws the exception when I collect the results I tried adding the cast to the aggRdd query above and that didn't help. - Ranga On Wed, Oct 8, 2014 at 3:52 PM, Michael Armbrust mich...@databricks.com wrote: Using SUM on a string should automatically cast the column. Also you can use CAST to change the datatype https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-TypeConversionFunctions . What version of Spark are you running? This could be https://issues.apache.org/jira/browse/SPARK-1994 On Wed, Oct 8, 2014 at 3:47 PM, Ranga sra...@gmail.com wrote: Hi I am in the process of migrating some logic in pig scripts to Spark-SQL. As part of this process, I am creating a few Select...Group By query and registering them as tables using the SchemaRDD.registerAsTable feature. When using such a registered table in a subsequent Select...Group By query, I get a ClassCastException. java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer This happens when I use the Sum function on one of the columns. Is there anyway to specify the data type for the columns when the registerAsTable function is called? Are there other approaches that I should be looking at? Thanks for your help. - Ranga
How to configure build.sbt for Spark 1.2.0
I built Spark 1.2.0 succesfully, but was unable to build my Spark program under 1.2.0 with sbt assembly my build.sbt file. It contains: I tried: org.apache.spark %% spark-sql % 1.2.0, org.apache.spark %% spark-core % 1.2.0, and org.apache.spark %% spark-sql % 1.2.0-SNAPSHOT, org.apache.spark %% spark-core % 1.2.0-SNAPSHOT, but I get errors like: [warn] :: [warn] :: UNRESOLVED DEPENDENCIES :: [warn] :: [warn] :: org.apache.spark#spark-sql_2.10;1.2.0: not found [warn] :: org.apache.spark#spark-core_2.10;1.2.0: not found [warn] :: sbt.ResolveException: unresolved dependency: org.apache.spark#spark-sql_2.10;1.2.0: not found unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0: not found ... [error] (*:update) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-sql_2.10;1.2.0: not found [error] unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0: not found Do I need to configure my build.sbt to point to my local spark 1.2.0 repository? How? Thanks, - Arun
RE: Error reading from Kafka
Hi, I think you have to change the code like this to specify the type info, like this: val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2) You can take a try, actually Kafka unit test also use this API and worked fine. Besides, the fixed issue you mentioned below will only be occurred in Java code calling related API. Thanks Jerry From: Antonio Jesus Navarro [mailto:ajnava...@stratio.com] Sent: Wednesday, October 08, 2014 10:04 PM To: user@spark.apache.org Subject: Error reading from Kafka Hi, I'm trying to read from Kafka. I was able to do it correctly with this method. def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] But now I have to add some params to kafka consumer so I've changed to other createStream method but I'm getting an error: 14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: scala.runtime.Nothing$.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2849) at java.lang.Class.getConstructor(Class.java:1718) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) This is my code. It seems that createStream returns ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string)) so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by reflection from Nothing object and don't find the method. val topics = config.getString(nessus.kafka.topics) val numThreads = config.getInt(nessus.kafka.numThreads) val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap val kafkaParams = Map( zookeeper.connect - localhost:2181, group.idhttp://group.id/ - my-grp) val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2) I found that issue https://issues.apache.org/jira/browse/SPARK-2103 https://issues.apache.org/jira/browse/SPARK-2103 but it was solved and I'm using spark 1.1.0 and scala 2.10 so I don't know what happens. Any thoughts? -- [http://www.stratio.com/wp-content/uploads/2014/05/stratio_logo_2014.png]http://www.stratio.com/ Avenida de Europa, 26. Ática 5. 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 352 59 42 // @stratiobdhttps://twitter.com/StratioBD
Re: How to configure build.sbt for Spark 1.2.0
Hi Pat, Couple of points: 1) I must have done something naive like: git clone git://github.com/apache/spark.git -b branch-1.2.0 because git branch is telling me I'm on the master branch, and I see that branch-1.2.0 doesn't exist (https://github.com/apache/spark). Nevertheless, when I compiled this master branch spark shell tells me I have 1.2.0. So I guess the master is currently 1.2.0... 2) The README on the master branch only has build instructions for maven. I built Spark successfully with mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package and it looks like the publish local solution for maven is: mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean install I will report back with the result. On Wed, Oct 8, 2014 at 5:50 PM, Pat McDonough pat.mcdono...@databricks.com wrote: Hey Arun, Since this build depends on unpublished builds of spark (1.2.0-SNAPSHOT), you'll need to first build spark and publish-local so your application build can find those SNAPSHOTs in your local repo. Just append publish-local to your sbt command where you build Spark. -Pat On Wed, Oct 8, 2014 at 5:35 PM, Arun Luthra arun.lut...@gmail.com wrote: I built Spark 1.2.0 succesfully, but was unable to build my Spark program under 1.2.0 with sbt assembly my build.sbt file. It contains: I tried: org.apache.spark %% spark-sql % 1.2.0, org.apache.spark %% spark-core % 1.2.0, and org.apache.spark %% spark-sql % 1.2.0-SNAPSHOT, org.apache.spark %% spark-core % 1.2.0-SNAPSHOT, but I get errors like: [warn] :: [warn] :: UNRESOLVED DEPENDENCIES :: [warn] :: [warn] :: org.apache.spark#spark-sql_2.10;1.2.0: not found [warn] :: org.apache.spark#spark-core_2.10;1.2.0: not found [warn] :: sbt.ResolveException: unresolved dependency: org.apache.spark#spark-sql_2.10;1.2.0: not found unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0: not found ... [error] (*:update) sbt.ResolveException: unresolved dependency: org.apache.spark#spark-sql_2.10;1.2.0: not found [error] unresolved dependency: org.apache.spark#spark-core_2.10;1.2.0: not found Do I need to configure my build.sbt to point to my local spark 1.2.0 repository? How? Thanks, - Arun
Re: Reading from HBase is too slow
Sean, I did specify the number of cores to use as follows: ... ... val sparkConf = new SparkConf() .setAppName( Reading HBase ) .set(spark.cores.max, 32) val sc = new SparkContext(sparkConf) ... ... But that does not solve the problem --- only 2 workers are allocated. I'm using Spark 0.9 and submitting my job through Yarn client mode. Actually, setting *spark.cores.max* only applies when the job runs on a *standalone deploy cluster *or a *Mesos cluster in coarse-grained sharing mode*. Please refer to this link http://spark.apache.org/docs/0.9.1/configuration.html So how to specify the number of executors when submitting a Spark 0.9 job in Yarn Client mode? 2014-10-08 15:09 GMT+08:00 Sean Owen so...@cloudera.com: You do need to specify the number of executor cores to use. Executors are not like mappers. After all they may do much more in their lifetime than just read splits from HBase so would not make sense to determine it by something that the first line of the program does. On Oct 8, 2014 8:00 AM, Tao Xiao xiaotao.cs@gmail.com wrote: Hi Sean, Do I need to specify the number of executors when submitting the job? I suppose the number of executors will be determined by the number of regions of the table. Just like a MapReduce job, you needn't specify the number of map tasks when reading from a HBase table. The script to submit my job can be seen in my second post. Please refer to that. 2014-10-08 13:44 GMT+08:00 Sean Owen so...@cloudera.com: How did you run your program? I don't see from your earlier post that you ever asked for more executors. On Wed, Oct 8, 2014 at 4:29 AM, Tao Xiao xiaotao.cs@gmail.com wrote: I found the reason why reading HBase is too slow. Although each regionserver serves multiple regions for the table I'm reading, the number of Spark workers allocated by Yarn is too low. Actually, I could see that the table has dozens of regions spread over about 20 regionservers, but only two Spark workers are allocated by Yarn. What is worse, the two workers run one after one. So, the Spark job lost parallelism. So now the question is : Why are only 2 workers allocated?
Spark SQL parser bug?
Hi - When I run the following Spark SQL query in Spark-shell ( version 1.1.0) : val rdd = sqlContext.sql(SELECT a FROM x WHERE ts = '2012-01-01T00:00:00' AND ts = '2012-03-31T23:59:59' ) it gives the following error: rdd: org.apache.spark.sql.SchemaRDD = SchemaRDD[294] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == java.util.NoSuchElementException: head of empty list The ts column in the where clause has timestamp data and is of type timestamp. If I replace the string '2012-01-01T00:00:00' in the where clause with its epoch value, then the query works fine. It looks I have run into an issue described in this pull request: https://github.com/apache/spark/pull/2084 Is that PR not merged in Spark version 1.1.0? Or am I missing something? Thanks, Mohammed
JavaPairDStream saveAsTextFile
HI, I am looking at the documentation for Java API for Streams. The scala library has option to save file locally, but the Java version doesnt seem to. The only option i see is saveAsHadoopFiles. Is there a reason why this option was left out from Java API? http://spark.apache.org/docs/1.0.0/api/java/index.html?org/apache/spark/streaming/dstream/DStream.html Thanks. SA
Re: java.io.IOException Error in task deserialization
This is also happening to me on a regular basis, when the job is large with relatively large serialized objects used in each RDD lineage. A bad thing about this is that this exception always stops the whole job. On Fri, Sep 26, 2014 at 11:17 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: FWIW I suspect that each count operation is an opportunity for you to trigger the bug, and each filter operation increases the likelihood of setting up the bug. I normally don't come across this error until my job has been running for an hour or two and had a chance to build up longer lineages for some RDDs. It sounds like your data is a bit smaller and it's more feasible for you to build up longer lineages more quickly. If you can reduce your number of filter operations (for example by combining some into a single function) that may help. It may also help to introduce persistence or checkpointing at intermediate stages so that the length of the lineages that have to get replayed isn't as long. On Fri, Sep 26, 2014 at 11:10 AM, Arun Ahuja aahuj...@gmail.com wrote: No for me as well it is non-deterministic. It happens in a piece of code that does many filter and counts on a small set of records (~1k-10k). The originally set is persisted in memory and we have a Kryo serializer set for it. The task itself takes in just a few filtering parameters. This with the same setting has sometimes completed to sucess and sometimes failed during this step. Arun On Fri, Sep 26, 2014 at 1:32 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: I've had multiple jobs crash due to java.io.IOException: unexpected exception type; I've been running the 1.1 branch for some time and am now running the 1.1 release binaries. Note that I only use PySpark. I haven't kept detailed notes or the tracebacks around since there are other problems that have caused my greater grief (namely key not found errors). For me the exception seems to occur non-deterministically, which is a bit interesting since the error message shows that the same stage has failed multiple times. Are you able to consistently re-produce the bug across multiple invocations at the same place? On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744)
Re: Spark-SQL: SchemaRDD - ClassCastException
This is a bit strange. When I print the schema for the RDD, it reflects the correct data type for each column. But doing any kind of mathematical calculation seems to result in ClassCastException. Here is a sample that results in the exception: select c1, c2 ... cast (c18 as int) * cast (c21 as int) ... from table Any other pointers? Thanks for the help. - Ranga On Wed, Oct 8, 2014 at 5:20 PM, Ranga sra...@gmail.com wrote: Sorry. Its 1.1.0. After digging a bit more into this, it seems like the OpenCSV Deseralizer converts all the columns to a String type. This maybe throwing the execution off. Planning to create a class and map the rows to this custom class. Will keep this thread updated. On Wed, Oct 8, 2014 at 5:11 PM, Michael Armbrust mich...@databricks.com wrote: Which version of Spark are you running? On Wed, Oct 8, 2014 at 4:18 PM, Ranga sra...@gmail.com wrote: Thanks Michael. Should the cast be done in the source RDD or while doing the SUM? To give a better picture here is the code sequence: val sourceRdd = sql(select ... from source-hive-table) sourceRdd.registerAsTable(sourceRDD) val aggRdd = sql(select c1, c2, sum(c3) from sourceRDD group by c1, c2) // This query throws the exception when I collect the results I tried adding the cast to the aggRdd query above and that didn't help. - Ranga On Wed, Oct 8, 2014 at 3:52 PM, Michael Armbrust mich...@databricks.com wrote: Using SUM on a string should automatically cast the column. Also you can use CAST to change the datatype https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-TypeConversionFunctions . What version of Spark are you running? This could be https://issues.apache.org/jira/browse/SPARK-1994 On Wed, Oct 8, 2014 at 3:47 PM, Ranga sra...@gmail.com wrote: Hi I am in the process of migrating some logic in pig scripts to Spark-SQL. As part of this process, I am creating a few Select...Group By query and registering them as tables using the SchemaRDD.registerAsTable feature. When using such a registered table in a subsequent Select...Group By query, I get a ClassCastException. java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer This happens when I use the Sum function on one of the columns. Is there anyway to specify the data type for the columns when the registerAsTable function is called? Are there other approaches that I should be looking at? Thanks for your help. - Ranga
Re: Dedup
What is your data like? Are you looking at exact matching or are you interested in nearly same records? Do you need to merge similar records to get a canonical value? Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Thu, Oct 9, 2014 at 2:31 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Maybe you could implement something like this (i don't know if something similar already exists in spark): http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf Best, Flavio On Oct 8, 2014 9:58 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Multiple values may be different, yet still be considered duplicates depending on how the dedup criteria is selected. Is that correct? Do you care in that case what value you select for a given key? On Wed, Oct 8, 2014 at 3:37 PM, Ge, Yao (Y.) y...@ford.com wrote: I need to do deduplication processing in Spark. The current plan is to generate a tuple where key is the dedup criteria and value is the original input. I am thinking to use reduceByKey to discard duplicate values. If I do that, can I simply return the first argument or should I return a copy of the first argument. Is there are better way to do dedup in Spark? -Yao
Re: coalesce with shuffle or repartition is not necessarily fault-tolerant
IIRC - the random is seeded with the index, so it will always produce the same result for the same index. Maybe I don't totally follow though. Could you give a small example of how this might change the RDD ordering in a way that you don't expect? In general repartition() will not preserve the ordering of an RDD. On Wed, Oct 8, 2014 at 3:42 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I noticed that repartition will result in non-deterministic lineage because it'll result in changed orders for rows. So for instance, if you do things like: val data = read(...) val k = data.repartition(5) val h = k.repartition(5) It seems that this results in different ordering of rows for 'k' each time you call it. And because of this different ordering, 'h' will result in different partitions even, because 'repartition' distributes through a random number generator with the 'index' as the key. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sparksql connect remote hive cluster
Spark will need to connect both to the hive metastore and to all HDFS nodes (NN and DN's). If that is all in place then it should work. In this case it looks like maybe it can't connect to a datanode in HDFS to get the raw data. Keep in mind that the performance might not be very good if you are trying to read large amounts of data over the network. On Wed, Oct 8, 2014 at 5:33 AM, jamborta jambo...@gmail.com wrote: Hi all, just wondering if is it possible to allow spark to connect to hive on another cluster located remotely? I have setup hive-site.xml and amended the hive-metatstore uri, also opened the port for zookeeper, webhdfs and hive metastore. It seems it connects to hive, then it fails with the following: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-1886934195-100.73.212.101-1411645855947:blk_1073763904_23146 file=/user/tja01/datasets/00ab46fa4d6711e4afb70003ff41ebbf/part-3 not sure if some of the ports are not open or it needs access to additional things. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sparksql-connect-remote-hive-cluster-tp15928.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to configure build.sbt for Spark 1.2.0
There is not yet a 1.2.0 branch; there is no 1.2.0 release. master is 1.2.0-SNAPSHOT, not 1.2.0. Your final command is correct, but it's redundant to 'package' and then throw that away with another 'clean'. Just the final command with '... clean install' is needed. On Thu, Oct 9, 2014 at 2:12 AM, Arun Luthra arun.lut...@gmail.com wrote: Hi Pat, Couple of points: 1) I must have done something naive like: git clone git://github.com/apache/spark.git -b branch-1.2.0 because git branch is telling me I'm on the master branch, and I see that branch-1.2.0 doesn't exist (https://github.com/apache/spark). Nevertheless, when I compiled this master branch spark shell tells me I have 1.2.0. So I guess the master is currently 1.2.0... 2) The README on the master branch only has build instructions for maven. I built Spark successfully with mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package and it looks like the publish local solution for maven is: mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean install I will report back with the result. On Wed, Oct 8, 2014 at 5:50 PM, Pat McDonough pat.mcdono...@databricks.com wrote: Hey Arun, Since this build depends on unpublished builds of spark (1.2.0-SNAPSHOT), you'll need to first build spark and publish-local so your application build can find those SNAPSHOTs in your local repo. Just append publish-local to your sbt command where you build Spark. -Pat - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org