OK let us try this

1) we have a csv file as below called cities.csv

country,city,population
Germany,Berlin,3520031
Germany,Hamburg,1787408
Germany,Munich,1450381
Turkey,Ankara,4587558
Turkey,Istanbul,14025646
Turkey,Izmir,2847691
United States,Chicago IL,2670406
United States,Los Angeles CA,085014
United States,New York City NY,8622357

2) Put this in HDFS as below

hdfs dfs -put cities.csv /data/stg/test

Read it into dataframe in PySpark as below

 csv_file="hdfs://rhes75:9000/data/stg/test/cities.csv"
# read hive table in spark
listing_df =
spark.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load(csv_file)
 listing_df.printSchema()
 print(f"""\n Reading from Hive table {csv_file}\n""")
 listing_df.show(100,False)

3) create  spark temp table from the Dataframe. I call it temp

 print(f"""\n Reading from temp table temp created on listing_df\n""")
 listing_df.createOrReplaceTempView("temp")

4) use standard sql with windowing to get the result out

     sqltext = """
    SELECT country, city
    FROM
    (
    SELECT
              country AS country
            ,  city AS city
            , DENSE_RANK() OVER (PARTITION BY country ORDER BY population)
AS RANK
            , max(population) OVER (PARTITION by country ORDER BY country,
city) AS population
    FROM temp
    GROUP BY country, city, population
    )
    WHERE RANK in (3)
    ORDER by population DESC
    """
    spark.sql(sqltext).show()

4) let us test it

root
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- population: double (nullable = true)


 Reading from Hive table hdfs://rhes75:9000/data/stg/test/cities.csv

+-------------+----------------+-----------+
|country      |city            |population |
+-------------+----------------+-----------+
|Germany      |Berlin          |3520031.0  |
|Germany      |Hamburg         |1787408.0  |
|Germany      |Munich          |1450381.0  |
|Turkey       |Ankara          |4587558.0  |
|Turkey       |Istanbul        |1.4025646E7|
|Turkey       |Izmir           |2847691.0  |
|United States|Chicago IL      |2670406.0  |
|United States|Los Angeles CA  |85014.0    |
|United States|New York City NY|8622357.0  |
+-------------+----------------+-----------+


 Reading from temp table temp created on listing_df

+-------------+----------------+
|      country|            city|
+-------------+----------------+
|       Turkey|        Istanbul|
|United States|New York City NY|
|      Germany|          Berlin|
+-------------+----------------+

The codes are attached

I am sure it can be improved.



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 20 Dec 2022 at 20:35, Oliver Ruebenacker <oliv...@broadinstitute.org>
wrote:

>
>      Hello,
>
>   Let's say the data is like this:
>
> +---------------+-------------------+------------+
> | country       | city              | population |
> +---------------+-------------------+------------+
> | Germany       | Berlin            | 3520031    |
> | Germany       | Hamburg           | 1787408    |
> | Germany       | Munich            | 1450381    |
> | Turkey        | Ankara            | 4587558    |
> | Turkey        | Istanbul          | 14025646   |
> | Turkey        | Izmir             | 2847691    |
> | United States | Chicago, IL       | 2670406    |
> | United States | Los Angeles, CA   | 4085014    |
> | United States | New York City, NY | 8622357    |
> +---------------+-------------------+------------+
>
> I want to get the largest city in each country:
>
> +---------------+-------------------+
> | country       | city              |
> +---------------+-------------------+
> | Germany       | Berlin            |
> | Turkey        | Istanbul          |
> | United States | New York City, NY |
> +---------------+-------------------+
>
> Thanks!
>
>      Best, Oliver
>
> On Tue, Dec 20, 2022 at 5:52 AM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Windowing functions were invented to avoid doing lengthy group by etc.
>>
>> As usual there is a lot of heat but little light
>>
>> Please provide:
>>
>>
>>    1. Sample input. I gather this data is stored in some csv, tsv, table
>>    format
>>    2. The output that you would like to see.
>>
>>
>> Have a look at this article of mine  Technical Analysis of the latest UK
>> House Price Index, Deploying Modern tools
>> <https://www.linkedin.com/pulse/technical-analysis-latest-uk-house-price-index-modern-mich/>
>>
>>
>> The PySpark code and windowing functions are here
>> <https://github.com/michTalebzadeh/DataScience/blob/datascience/src/workoutYearlyAveragePricesAll.py>
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 19 Dec 2022 at 16:44, Oliver Ruebenacker <
>> oliv...@broadinstitute.org> wrote:
>>
>>>
>>>      Hello,
>>>
>>>   Thank you for the response!
>>>
>>>   I can think of two ways to get the largest city by country, but both
>>> seem to be inefficient:
>>>
>>>   (1) I could group by country, sort each group by population, add the
>>> row number within each group, and then retain only cities with a row number
>>> equal to 1. But it seems wasteful to sort everything when I only want the
>>> largest of each country
>>>
>>>   (2) I could group by country, get the maximum city population for each
>>> country, join that with the original data frame, and then retain only
>>> cities with population equal to the maximum population in the country. But
>>> that seems also expensive because I need to join.
>>>
>>>   Am I missing something?
>>>
>>>   Thanks!
>>>
>>>      Best, Oliver
>>>
>>> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> In spark you can use windowing function
>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>> achieve this
>>>>
>>>> HTH
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
>>>> oliv...@broadinstitute.org> wrote:
>>>>
>>>>>
>>>>>      Hello,
>>>>>
>>>>>   How can I retain from each group only the row for which one value is
>>>>> the maximum of the group? For example, imagine a DataFrame containing all
>>>>> major cities in the world, with three columns: (1) City name (2) Country
>>>>> (3) population. How would I get a DataFrame that only contains the largest
>>>>> city in each country? Thanks!
>>>>>
>>>>>      Best, Oliver
>>>>>
>>>>> --
>>>>> Oliver Ruebenacker, Ph.D. (he)
>>>>> Senior Software Engineer, Knowledge Portal Network <http://kp4cd.org/>,
>>>>> Flannick Lab <http://www.flannicklab.org/>, Broad Institute
>>>>> <http://www.broadinstitute.org/>
>>>>>
>>>>
>>>
>>> --
>>> Oliver Ruebenacker, Ph.D. (he)
>>> Senior Software Engineer, Knowledge Portal Network <http://kp4cd.org/>, 
>>> Flannick
>>> Lab <http://www.flannicklab.org/>, Broad Institute
>>> <http://www.broadinstitute.org/>
>>>
>>
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network <http://kp4cd.org/>, 
> Flannick
> Lab <http://www.flannicklab.org/>, Broad Institute
> <http://www.broadinstitute.org/>
>
country,city,population
Germany,Berlin,3520031
Germany,Hamburg,1787408
Germany,Munich,1450381
Turkey,Ankara,4587558
Turkey,Istanbul,14025646
Turkey,Izmir,2847691
United States,Chicago IL,2670406
United States,Los Angeles CA,085014
United States,New York City NY,8622357  
#! /usr/bin/env python3
from __future__ import print_function
import sys
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql import functions as F
from pyspark.sql.functions import col, round
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

def spark_session(appName):
  return SparkSession.builder \
        .appName(appName) \
        .enableHiveSupport() \
        .getOrCreate()

def main():
    appName = "CITIES"
    spark =spark_session(appName)
    # get the sample
    csv_file="hdfs://rhes75:9000/data/stg/test/cities.csv"
    # read hive table in spark
    listing_df = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("header", "true").load(csv_file)
    listing_df.printSchema()
    print(f"""\n Reading from Hive table {csv_file}\n""")
    listing_df.show(100,False)
    #df2.show(100,False)
    print(f"""\n Reading from temp table temp created on listing_df\n""")
    listing_df.createOrReplaceTempView("temp")
    sqltext = """
    SELECT country, city
    FROM
    (
    SELECT 
              country AS country
            ,  city AS city
            , DENSE_RANK() OVER (PARTITION BY country ORDER BY population) AS RANK
            , max(population) OVER (PARTITION by country ORDER BY country, city) AS population
    FROM temp
    GROUP BY country, city, population 
    )
    WHERE RANK in (3)
    ORDER by population DESC
    """
    spark.sql(sqltext).show()
if __name__ == '__main__':
    main()

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to