Your original code snippet seems incomplete and there isn't enough
information to figure out what problem you actually ran into
from your original code snippet there is an rdd variable which is well
defined and a df variable that is not defined in the snippet of code you
sent
one way to make this work is as below (until the last line is executed you
are actually not collecting anything on the driver, and if your dataset is
too big to collect on the driver for inspection just do a take(n) on the
result
from pyspark.sql import Row,SQLContext
from pyspark.sql.functions import count
sqlContext = SQLContext(sc)
# convert list of ip into a data frame with column ip
Record = Row("ip")
df = sc.parallelize(map(lambda x: Record(x), ['208.51.22.18',
'31.207.6.173', '208.51.22.18'])).toDF()
# obtain ip -> frequency and inspect
df.groupBy(df.ip).agg(count(df.ip)).show()
+------------+---------+
| ip|COUNT(ip)|
+------------+---------+
|208.51.22.18| 2|
|31.207.6.173| 1|
+------------+---------+
what exactly is the issue you are running into when you say it doesn't get
through?
On Thu, May 21, 2015 at 10:47 AM, ping yan <[email protected]> wrote:
> Thanks. I suspected that, but figured that df query inside a map sounds so
> intuitive that I don't just want to give up.
>
> I've tried join and even better with a DStream.transform() and it works!
> freqs = testips.transform(lambda rdd: rdd.join(kvrdd).map(lambda (x,y):
> y[1]))
>
> Thank you for the help!
>
> Ping
>
> On Thu, May 21, 2015 at 10:40 AM, Holden Karau <[email protected]>
> wrote:
>
>> So DataFrames, like RDDs, can only be accused from the driver. If your
>> IP Frequency table is small enough you could collect it and distribute it
>> as a hashmap with broadcast or you could also join your rdd with the ip
>> frequency table. Hope that helps :)
>>
>>
>> On Thursday, May 21, 2015, ping yan <[email protected]> wrote:
>>
>>> I have a dataframe as a reference table for IP frequencies.
>>> e.g.,
>>>
>>> ip freq
>>> 10.226.93.67 1
>>> 10.226.93.69 1
>>> 161.168.251.101 4
>>> 10.236.70.2 1
>>> 161.168.251.105 14
>>>
>>>
>>> All I need is to query the df in a map.
>>>
>>> rdd = sc.parallelize(['208.51.22.18', '31.207.6.173', '208.51.22.18'])
>>>
>>> freqs = rdd.map(lambda x: df.where(df.ip ==x ).first())
>>>
>>> It doesn't get through.. would appreciate any help.
>>>
>>> Thanks!
>>> Ping
>>>
>>>
>>>
>>>
>>> --
>>> Ping Yan
>>> Ph.D. in Management
>>> Dept. of Management Information Systems
>>> University of Arizona
>>> Tucson, AZ 85721
>>>
>>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>> Linked In: https://www.linkedin.com/in/holdenkarau
>>
>>
>
>
> --
> Ping Yan
> Ph.D. in Management
> Dept. of Management Information Systems
> University of Arizona
> Tucson, AZ 85721
>
>