Re: column expression in left outer join for DataFrame

2015-03-25 Thread S Krishna
Hi,

Thanks for your response.  I am not clear about why the query is ambiguous.

val both = df_2.join(df_1, df_2("country")===df_1("country"), "left_outer")

I thought df_2("country")===df_1("country") indicates that the country
field in the 2 dataframes should match and df_2("country") is the
equivalent of df_2.country in SQL, while  df_1("country") is the equivalent
of df_1.country in SQL. So I am not sure why it is ambiguous. In Spark
1.2.0 I have used the same logic using SparkSQL  and Tables ( e.g.  "WHERE
tab1.country = tab2.country")  and had no problems getting the correct
result.

thanks





On Wed, Mar 25, 2015 at 11:05 AM, Michael Armbrust 
wrote:

> Unfortunately you are now hitting a bug (that is fixed in master and will
> be released in 1.3.1 hopefully next week).  However, even with that your
> query is still ambiguous and you will need to use aliases:
>
> val df_1 = df.filter( df("event") === 0)
>   . select("country", "cnt").as("a")
> val df_2 = df.filter( df("event") === 3)
>   . select("country", "cnt").as("b")
> val both = df_2.join(df_1, $"a.country" === $"b.country"), "left_outer")
>
>
>
> On Tue, Mar 24, 2015 at 11:57 PM, S Krishna  wrote:
>
>> Hi,
>>
>> Thanks for your response. I modified my code as per your suggestion, but
>> now I am getting a runtime error. Here's my code:
>>
>> val df_1 = df.filter( df("event") === 0)
>>   . select("country", "cnt")
>>
>> val df_2 = df.filter( df("event") === 3)
>>   . select("country", "cnt")
>>
>> df_1.show()
>> //produces the following output :
>> // countrycnt
>> //   tw   3000
>> //   uk   2000
>> //   us   1000
>>
>> df_2.show()
>> //produces the following output :
>> // countrycnt
>> //   tw   25
>> //   uk   200
>> //   us   95
>>
>> val both = df_2.join(df_1, df_2("country")===df_1("country"),
>> "left_outer")
>>
>> I am getting the following error when executing the join statement:
>>
>> java.util.NoSuchElementException: next on empty iterator.
>>
>> This error seems to be originating at DataFrame.join (line 133 in
>> DataFrame.scala).
>>
>> The show() results show that both dataframes do have columns named
>> "country" and that they are non-empty. I also tried the simpler join ( i.e.
>> df_2.join(df_1) ) and got the same error stated above.
>>
>> I would like to know what is wrong with the join statement above.
>>
>> thanks
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Mar 24, 2015 at 6:08 PM, Michael Armbrust > > wrote:
>>
>>> You need to use `===`, so that you are constructing a column expression
>>> instead of evaluating the standard scala equality method.  Calling methods
>>> to access columns (i.e. df.county is only supported in python).
>>>
>>> val join_df =  df1.join( df2, df1("country") === df2("country"),
>>> "left_outer")
>>>
>>> On Tue, Mar 24, 2015 at 5:50 PM, SK  wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to port some code that was working in Spark 1.2.0 on the
>>>> latest
>>>> version, Spark 1.3.0. This code involves a left outer join between two
>>>> SchemaRDDs which I am now trying to change to a left outer join between
>>>> 2
>>>> DataFrames. I followed the example  for left outer join of DataFrame at
>>>>
>>>> https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
>>>>
>>>> Here's my code, where df1 and df2 are the 2 dataframes I am joining on
>>>> the
>>>> "country" field:
>>>>
>>>>  val join_df =  df1.join( df2,  df1.country == df2.country,
>>>> "left_outer")
>>>>
>>>> But I got a compilation error that value  country is not a member of
>>>> sql.DataFrame
>>>>
>>>> I  also tried the following:
>>>>  val join_df =  df1.join( df2, df1("country") == df2("country"),
>>>> "left_outer")
>>>>
>>>> I got a compilation error that it is a Boolean whereas a Column is
>>>> required.
>>>>
>>>> So what is the correct Column expression I need to provide for joining
>>>> the 2
>>>> dataframes on a specific field ?
>>>>
>>>> thanks
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/column-expression-in-left-outer-join-for-DataFrame-tp22209.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: column expression in left outer join for DataFrame

2015-03-24 Thread S Krishna
Hi,

Thanks for your response. I modified my code as per your suggestion, but
now I am getting a runtime error. Here's my code:

val df_1 = df.filter( df("event") === 0)
  . select("country", "cnt")

val df_2 = df.filter( df("event") === 3)
  . select("country", "cnt")

df_1.show()
//produces the following output :
// countrycnt
//   tw   3000
//   uk   2000
//   us   1000

df_2.show()
//produces the following output :
// countrycnt
//   tw   25
//   uk   200
//   us   95

val both = df_2.join(df_1, df_2("country")===df_1("country"), "left_outer")

I am getting the following error when executing the join statement:

java.util.NoSuchElementException: next on empty iterator.

This error seems to be originating at DataFrame.join (line 133 in
DataFrame.scala).

The show() results show that both dataframes do have columns named
"country" and that they are non-empty. I also tried the simpler join ( i.e.
df_2.join(df_1) ) and got the same error stated above.

I would like to know what is wrong with the join statement above.

thanks
























On Tue, Mar 24, 2015 at 6:08 PM, Michael Armbrust 
wrote:

> You need to use `===`, so that you are constructing a column expression
> instead of evaluating the standard scala equality method.  Calling methods
> to access columns (i.e. df.county is only supported in python).
>
> val join_df =  df1.join( df2, df1("country") === df2("country"),
> "left_outer")
>
> On Tue, Mar 24, 2015 at 5:50 PM, SK  wrote:
>
>> Hi,
>>
>> I am trying to port some code that was working in Spark 1.2.0 on the
>> latest
>> version, Spark 1.3.0. This code involves a left outer join between two
>> SchemaRDDs which I am now trying to change to a left outer join between 2
>> DataFrames. I followed the example  for left outer join of DataFrame at
>>
>> https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
>>
>> Here's my code, where df1 and df2 are the 2 dataframes I am joining on the
>> "country" field:
>>
>>  val join_df =  df1.join( df2,  df1.country == df2.country, "left_outer")
>>
>> But I got a compilation error that value  country is not a member of
>> sql.DataFrame
>>
>> I  also tried the following:
>>  val join_df =  df1.join( df2, df1("country") == df2("country"),
>> "left_outer")
>>
>> I got a compilation error that it is a Boolean whereas a Column is
>> required.
>>
>> So what is the correct Column expression I need to provide for joining
>> the 2
>> dataframes on a specific field ?
>>
>> thanks
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/column-expression-in-left-outer-join-for-DataFrame-tp22209.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark Streaming: Sentiment Analysis of Twitter streams

2014-10-15 Thread S Krishna
Hi,

I am using 1.1.0. I did set my twitter credentials and I am using the full
path. I did not paste this in the public post. I am running on a cluster
and getting the exception. Are you running in local or standalone mode?

Thanks
On Oct 15, 2014 3:20 AM, "Akhil Das"  wrote:

> I just ran the same code and it is running perfectly fine on my machine.
> These are the things on my end:
>
> - Spark version: 1.1.0
> - Gave full path to the negative and positive files
> - Set twitter auth credentials in the environment.
>
> And here's the code:
>
> import org.apache.spark.SparkContext
>> import org.apache.spark.SparkContext._
>> import org.apache.spark.SparkConf
>> import org.apache.spark.streaming.twitter.TwitterUtils
>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>> object Sentimenter {
>>   def main(args: Array[String]) {
>> System.setProperty("twitter4j.oauth.consumerKey","X");
>>
>> System.setProperty("twitter4j.oauth.consumerSecret","X");
>>
>> System.setProperty("twitter4j.oauth.accessToken","");
>>
>> System.setProperty("twitter4j.oauth.accessTokenSecret","XXX");
>>
>> val filters = new Array[String](2)
>> filters(0) = "ebola"
>> filters(1) = "isis"
>> val sparkConf = new
>> SparkConf().setAppName("TweetSentiment").setMaster("local[2]")
>> val sc = new SparkContext(sparkConf)
>> // get the list of positive words
>> val pos_list =  sc.textFile("file:///home/akhld/positive-words.txt")
>> //Random
>>   .filter(line => !line.isEmpty())
>>   .collect()
>>   .toSet
>> // get the list of negative words
>> val neg_list =  sc.textFile("file:///home/akhld/negative-words.txt")
>> //Random
>>   .filter(line => !line.isEmpty())
>>   .collect()
>>   .toSet
>> // create twitter stream
>> val ssc = new StreamingContext(sparkConf, Seconds(5))
>> val stream = TwitterUtils.createStream(ssc, None, filters)
>> val tweets = stream.map(r => r.getText)
>> tweets.print() // print tweet text
>> ssc.start()
>> ssc.awaitTermination()
>>   }
>> }
>
>
>
>
>
> Thanks
> Best Regards
>
> On Wed, Oct 15, 2014 at 1:43 AM, SK  wrote:
>
>> Hi,
>>
>> I am trying to implement simple sentiment analysis of Twitter streams in
>> Spark/Scala.  I am getting an exception and it appears when I combine
>> SparkContext with StreamingContext in the same program. When I read the
>> positive and negative words using only SparkContext.textFile (without
>> creating a StreamingContext) and analyze static text files, the program
>> works. Likewise, when I just create the twitter stream using
>> StreamingContext (and dont create a SparkContext to create the
>> vocabulary),
>> the program works. The exception seems to be appearing when I combine both
>> SparkContext and StreamingContext in the same program and I am not sure if
>> we are not allowed to  have both simultaneously. All the examples in the
>> streaming module contain only the StreamingContext. The error transcript
>> and
>> my code appear below. I would appreciate your guidance  in fixing this
>> error
>> and the right way to  read static files and streams in the same program or
>> any pointers to relevant examples.
>> Thanks.
>>
>>
>> --Error transcript -
>> Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net):
>> java.io.IOException: unexpected exception type
>>
>> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
>>
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
>>
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>
>>
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>>
>>
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
>>
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> java.lang.Thread.run(Thread.java:745)
>> -- My code below
>> --
>> object TweetSentiment {
>>   def main(args: Array[String]) {
>>
>>
>> val filters = args
>> val sparkConf = new SparkConf().setAppName("TweetSent