Re: column expression in left outer join for DataFrame
Hi, Thanks for your response. I modified my code as per your suggestion, but now I am getting a runtime error. Here's my code: val df_1 = df.filter( df(event) === 0) . select(country, cnt) val df_2 = df.filter( df(event) === 3) . select(country, cnt) df_1.show() //produces the following output : // countrycnt // tw 3000 // uk 2000 // us 1000 df_2.show() //produces the following output : // countrycnt // tw 25 // uk 200 // us 95 val both = df_2.join(df_1, df_2(country)===df_1(country), left_outer) I am getting the following error when executing the join statement: java.util.NoSuchElementException: next on empty iterator. This error seems to be originating at DataFrame.join (line 133 in DataFrame.scala). The show() results show that both dataframes do have columns named country and that they are non-empty. I also tried the simpler join ( i.e. df_2.join(df_1) ) and got the same error stated above. I would like to know what is wrong with the join statement above. thanks On Tue, Mar 24, 2015 at 6:08 PM, Michael Armbrust mich...@databricks.com wrote: You need to use `===`, so that you are constructing a column expression instead of evaluating the standard scala equality method. Calling methods to access columns (i.e. df.county is only supported in python). val join_df = df1.join( df2, df1(country) === df2(country), left_outer) On Tue, Mar 24, 2015 at 5:50 PM, SK skrishna...@gmail.com wrote: Hi, I am trying to port some code that was working in Spark 1.2.0 on the latest version, Spark 1.3.0. This code involves a left outer join between two SchemaRDDs which I am now trying to change to a left outer join between 2 DataFrames. I followed the example for left outer join of DataFrame at https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html Here's my code, where df1 and df2 are the 2 dataframes I am joining on the country field: val join_df = df1.join( df2, df1.country == df2.country, left_outer) But I got a compilation error that value country is not a member of sql.DataFrame I also tried the following: val join_df = df1.join( df2, df1(country) == df2(country), left_outer) I got a compilation error that it is a Boolean whereas a Column is required. So what is the correct Column expression I need to provide for joining the 2 dataframes on a specific field ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/column-expression-in-left-outer-join-for-DataFrame-tp22209.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: column expression in left outer join for DataFrame
Hi, Thanks for your response. I am not clear about why the query is ambiguous. val both = df_2.join(df_1, df_2(country)===df_1(country), left_outer) I thought df_2(country)===df_1(country) indicates that the country field in the 2 dataframes should match and df_2(country) is the equivalent of df_2.country in SQL, while df_1(country) is the equivalent of df_1.country in SQL. So I am not sure why it is ambiguous. In Spark 1.2.0 I have used the same logic using SparkSQL and Tables ( e.g. WHERE tab1.country = tab2.country) and had no problems getting the correct result. thanks On Wed, Mar 25, 2015 at 11:05 AM, Michael Armbrust mich...@databricks.com wrote: Unfortunately you are now hitting a bug (that is fixed in master and will be released in 1.3.1 hopefully next week). However, even with that your query is still ambiguous and you will need to use aliases: val df_1 = df.filter( df(event) === 0) . select(country, cnt).as(a) val df_2 = df.filter( df(event) === 3) . select(country, cnt).as(b) val both = df_2.join(df_1, $a.country === $b.country), left_outer) On Tue, Mar 24, 2015 at 11:57 PM, S Krishna skrishna...@gmail.com wrote: Hi, Thanks for your response. I modified my code as per your suggestion, but now I am getting a runtime error. Here's my code: val df_1 = df.filter( df(event) === 0) . select(country, cnt) val df_2 = df.filter( df(event) === 3) . select(country, cnt) df_1.show() //produces the following output : // countrycnt // tw 3000 // uk 2000 // us 1000 df_2.show() //produces the following output : // countrycnt // tw 25 // uk 200 // us 95 val both = df_2.join(df_1, df_2(country)===df_1(country), left_outer) I am getting the following error when executing the join statement: java.util.NoSuchElementException: next on empty iterator. This error seems to be originating at DataFrame.join (line 133 in DataFrame.scala). The show() results show that both dataframes do have columns named country and that they are non-empty. I also tried the simpler join ( i.e. df_2.join(df_1) ) and got the same error stated above. I would like to know what is wrong with the join statement above. thanks On Tue, Mar 24, 2015 at 6:08 PM, Michael Armbrust mich...@databricks.com wrote: You need to use `===`, so that you are constructing a column expression instead of evaluating the standard scala equality method. Calling methods to access columns (i.e. df.county is only supported in python). val join_df = df1.join( df2, df1(country) === df2(country), left_outer) On Tue, Mar 24, 2015 at 5:50 PM, SK skrishna...@gmail.com wrote: Hi, I am trying to port some code that was working in Spark 1.2.0 on the latest version, Spark 1.3.0. This code involves a left outer join between two SchemaRDDs which I am now trying to change to a left outer join between 2 DataFrames. I followed the example for left outer join of DataFrame at https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html Here's my code, where df1 and df2 are the 2 dataframes I am joining on the country field: val join_df = df1.join( df2, df1.country == df2.country, left_outer) But I got a compilation error that value country is not a member of sql.DataFrame I also tried the following: val join_df = df1.join( df2, df1(country) == df2(country), left_outer) I got a compilation error that it is a Boolean whereas a Column is required. So what is the correct Column expression I need to provide for joining the 2 dataframes on a specific field ? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/column-expression-in-left-outer-join-for-DataFrame-tp22209.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming: Sentiment Analysis of Twitter streams
Hi, I am using 1.1.0. I did set my twitter credentials and I am using the full path. I did not paste this in the public post. I am running on a cluster and getting the exception. Are you running in local or standalone mode? Thanks On Oct 15, 2014 3:20 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I just ran the same code and it is running perfectly fine on my machine. These are the things on my end: - Spark version: 1.1.0 - Gave full path to the negative and positive files - Set twitter auth credentials in the environment. And here's the code: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object Sentimenter { def main(args: Array[String]) { System.setProperty(twitter4j.oauth.consumerKey,X); System.setProperty(twitter4j.oauth.consumerSecret,X); System.setProperty(twitter4j.oauth.accessToken,); System.setProperty(twitter4j.oauth.accessTokenSecret,XXX); val filters = new Array[String](2) filters(0) = ebola filters(1) = isis val sparkConf = new SparkConf().setAppName(TweetSentiment).setMaster(local[2]) val sc = new SparkContext(sparkConf) // get the list of positive words val pos_list = sc.textFile(file:///home/akhld/positive-words.txt) //Random .filter(line = !line.isEmpty()) .collect() .toSet // get the list of negative words val neg_list = sc.textFile(file:///home/akhld/negative-words.txt) //Random .filter(line = !line.isEmpty()) .collect() .toSet // create twitter stream val ssc = new StreamingContext(sparkConf, Seconds(5)) val stream = TwitterUtils.createStream(ssc, None, filters) val tweets = stream.map(r = r.getText) tweets.print() // print tweet text ssc.start() ssc.awaitTermination() } } Thanks Best Regards On Wed, Oct 15, 2014 at 1:43 AM, SK skrishna...@gmail.com wrote: Hi, I am trying to implement simple sentiment analysis of Twitter streams in Spark/Scala. I am getting an exception and it appears when I combine SparkContext with StreamingContext in the same program. When I read the positive and negative words using only SparkContext.textFile (without creating a StreamingContext) and analyze static text files, the program works. Likewise, when I just create the twitter stream using StreamingContext (and dont create a SparkContext to create the vocabulary), the program works. The exception seems to be appearing when I combine both SparkContext and StreamingContext in the same program and I am not sure if we are not allowed to have both simultaneously. All the examples in the streaming module contain only the StreamingContext. The error transcript and my code appear below. I would appreciate your guidance in fixing this error and the right way to read static files and streams in the same program or any pointers to relevant examples. Thanks. --Error transcript - Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) -- My code below -- object TweetSentiment { def main(args: Array[String]) { val filters = args val sparkConf = new SparkConf().setAppName(TweetSentiment) val sc = new SparkContext(sparkConf) // get the list of positive words val pos_list = sc.textFile(positive-words.txt) .filter(line = !line.isEmpty())