Re: [PySpark] Getting the best row from each group

2022-12-21 Thread Oliver Ruebenacker
Wow, thank you so much!

On Wed, Dec 21, 2022 at 10:27 AM Mich Talebzadeh 
wrote:

> OK let us try this
>
> 1) we have a csv file as below called cities.csv
>
> country,city,population
> Germany,Berlin,3520031
> Germany,Hamburg,1787408
> Germany,Munich,1450381
> Turkey,Ankara,4587558
> Turkey,Istanbul,14025646
> Turkey,Izmir,2847691
> United States,Chicago IL,2670406
> United States,Los Angeles CA,085014
> United States,New York City NY,8622357
>
> 2) Put this in HDFS as below
>
> hdfs dfs -put cities.csv /data/stg/test
>
> Read it into dataframe in PySpark as below
>
>  csv_file="hdfs://rhes75:9000/data/stg/test/cities.csv"
> # read hive table in spark
> listing_df =
> spark.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load(csv_file)
>  listing_df.printSchema()
>  print(f"""\n Reading from Hive table {csv_file}\n""")
>  listing_df.show(100,False)
>
> 3) create  spark temp table from the Dataframe. I call it temp
>
>  print(f"""\n Reading from temp table temp created on listing_df\n""")
>  listing_df.createOrReplaceTempView("temp")
>
> 4) use standard sql with windowing to get the result out
>
>  sqltext = """
> SELECT country, city
> FROM
> (
> SELECT
>   country AS country
> ,  city AS city
> , DENSE_RANK() OVER (PARTITION BY country ORDER BY population)
> AS RANK
> , max(population) OVER (PARTITION by country ORDER BY country,
> city) AS population
> FROM temp
> GROUP BY country, city, population
> )
> WHERE RANK in (3)
> ORDER by population DESC
> """
> spark.sql(sqltext).show()
>
> 4) let us test it
>
> root
>  |-- country: string (nullable = true)
>  |-- city: string (nullable = true)
>  |-- population: double (nullable = true)
>
>
>  Reading from Hive table hdfs://rhes75:9000/data/stg/test/cities.csv
>
> +-++---+
> |country  |city|population |
> +-++---+
> |Germany  |Berlin  |3520031.0  |
> |Germany  |Hamburg |1787408.0  |
> |Germany  |Munich  |1450381.0  |
> |Turkey   |Ankara  |4587558.0  |
> |Turkey   |Istanbul|1.4025646E7|
> |Turkey   |Izmir   |2847691.0  |
> |United States|Chicago IL  |2670406.0  |
> |United States|Los Angeles CA  |85014.0|
> |United States|New York City NY|8622357.0  |
> +-++---+
>
>
>  Reading from temp table temp created on listing_df
>
> +-++
> |  country|city|
> +-++
> |   Turkey|Istanbul|
> |United States|New York City NY|
> |  Germany|  Berlin|
> +-++
>
> The codes are attached
>
> I am sure it can be improved.
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 20 Dec 2022 at 20:35, Oliver Ruebenacker <
> oliv...@broadinstitute.org> wrote:
>
>>
>>  Hello,
>>
>>   Let's say the data is like this:
>>
>> +---+---++
>> | country   | city  | population |
>> +---+---++
>> | Germany   | Berlin| 3520031|
>> | Germany   | Hamburg   | 1787408|
>> | Germany   | Munich| 1450381|
>> | Turkey| Ankara| 4587558|
>> | Turkey| Istanbul  | 14025646   |
>> | Turkey| Izmir | 2847691|
>> | United States | Chicago, IL   | 2670406|
>> | United States | Los Angeles, CA   | 4085014|
>> | United States | New York City, NY | 8622357|
>> +---+---++
>>
>> I want to get the largest city in each country:
>>
>> +---+---+
>> | country   | city  |
>> +---+---+
>> | Germany   | Berlin|
>> | Turkey| Istanbul  |
>> | United States | New York City, NY |
>> +---+---+
>>
>> Thanks!
>>
>>  Best, Oliver
>>
>> On Tue, Dec 20, 2022 at 5:52 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Windowing functions were invented to avoid doing lengthy group by etc.
>>>
>>> As usual there is a lot of heat but little light
>>>
>>> Please provide:
>>>
>>>
>>>1. Sample input. I gather this data is stored in some csv, tsv,
>>>table format
>>>2. The output that you 

Re: [PySpark] Getting the best row from each group

2022-12-21 Thread Mich Talebzadeh
OK let us try this

1) we have a csv file as below called cities.csv

country,city,population
Germany,Berlin,3520031
Germany,Hamburg,1787408
Germany,Munich,1450381
Turkey,Ankara,4587558
Turkey,Istanbul,14025646
Turkey,Izmir,2847691
United States,Chicago IL,2670406
United States,Los Angeles CA,085014
United States,New York City NY,8622357

2) Put this in HDFS as below

hdfs dfs -put cities.csv /data/stg/test

Read it into dataframe in PySpark as below

 csv_file="hdfs://rhes75:9000/data/stg/test/cities.csv"
# read hive table in spark
listing_df =
spark.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load(csv_file)
 listing_df.printSchema()
 print(f"""\n Reading from Hive table {csv_file}\n""")
 listing_df.show(100,False)

3) create  spark temp table from the Dataframe. I call it temp

 print(f"""\n Reading from temp table temp created on listing_df\n""")
 listing_df.createOrReplaceTempView("temp")

4) use standard sql with windowing to get the result out

 sqltext = """
SELECT country, city
FROM
(
SELECT
  country AS country
,  city AS city
, DENSE_RANK() OVER (PARTITION BY country ORDER BY population)
AS RANK
, max(population) OVER (PARTITION by country ORDER BY country,
city) AS population
FROM temp
GROUP BY country, city, population
)
WHERE RANK in (3)
ORDER by population DESC
"""
spark.sql(sqltext).show()

4) let us test it

root
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- population: double (nullable = true)


 Reading from Hive table hdfs://rhes75:9000/data/stg/test/cities.csv

+-++---+
|country  |city|population |
+-++---+
|Germany  |Berlin  |3520031.0  |
|Germany  |Hamburg |1787408.0  |
|Germany  |Munich  |1450381.0  |
|Turkey   |Ankara  |4587558.0  |
|Turkey   |Istanbul|1.4025646E7|
|Turkey   |Izmir   |2847691.0  |
|United States|Chicago IL  |2670406.0  |
|United States|Los Angeles CA  |85014.0|
|United States|New York City NY|8622357.0  |
+-++---+


 Reading from temp table temp created on listing_df

+-++
|  country|city|
+-++
|   Turkey|Istanbul|
|United States|New York City NY|
|  Germany|  Berlin|
+-++

The codes are attached

I am sure it can be improved.



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 20 Dec 2022 at 20:35, Oliver Ruebenacker 
wrote:

>
>  Hello,
>
>   Let's say the data is like this:
>
> +---+---++
> | country   | city  | population |
> +---+---++
> | Germany   | Berlin| 3520031|
> | Germany   | Hamburg   | 1787408|
> | Germany   | Munich| 1450381|
> | Turkey| Ankara| 4587558|
> | Turkey| Istanbul  | 14025646   |
> | Turkey| Izmir | 2847691|
> | United States | Chicago, IL   | 2670406|
> | United States | Los Angeles, CA   | 4085014|
> | United States | New York City, NY | 8622357|
> +---+---++
>
> I want to get the largest city in each country:
>
> +---+---+
> | country   | city  |
> +---+---+
> | Germany   | Berlin|
> | Turkey| Istanbul  |
> | United States | New York City, NY |
> +---+---+
>
> Thanks!
>
>  Best, Oliver
>
> On Tue, Dec 20, 2022 at 5:52 AM Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> Windowing functions were invented to avoid doing lengthy group by etc.
>>
>> As usual there is a lot of heat but little light
>>
>> Please provide:
>>
>>
>>1. Sample input. I gather this data is stored in some csv, tsv, table
>>format
>>2. The output that you would like to see.
>>
>>
>> Have a look at this article of mine  Technical Analysis of the latest UK
>> House Price Index, Deploying Modern tools
>> 
>>
>>
>> The PySpark code and windowing functions are here
>> 

Re: [PySpark] Getting the best row from each group

2022-12-20 Thread Artemis User
Try this one:  "select country, city, max(population) from your_table 
group by country"


Please note this returns a table of three columns, instead of two. This 
is a standard SQL query, and supported by Spark as well.


On 12/20/22 3:35 PM, Oliver Ruebenacker wrote:


 Hello,

  Let's say the data is like this:

+---+---++
| country   | city  | population |
+---+---++
| Germany   | Berlin | 3520031    |
| Germany   | Hamburg | 1787408    |
| Germany   | Munich | 1450381    |
| Turkey    | Ankara | 4587558    |
| Turkey    | Istanbul | 14025646   |
| Turkey    | Izmir | 2847691    |
| United States | Chicago, IL | 2670406    |
| United States | Los Angeles, CA | 4085014    |
| United States | New York City, NY | 8622357    |
+---+---++

I want to get the largest city in each country:

+---+---+
| country   | city  |
+---+---+
| Germany   | Berlin    |
| Turkey    | Istanbul    |
| United States | New York City, NY |
+---+---+

Thanks!

 Best, Oliver

On Tue, Dec 20, 2022 at 5:52 AM Mich Talebzadeh 
 wrote:


Hi,

Windowing functions were invented to avoid doing lengthy group by etc.

As usual there is a lot of heat but little light

Please provide:

 1. Sample input. I gather this data is stored in some csv, tsv,
table format
 2. The output that you would like to see.


Have a look at this article of mine Technical Analysis of the
latest UK House Price Index, Deploying Modern tools




The PySpark code and windowing functions arehere




HTH


**view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh

*Disclaimer:* Use it at your own risk.Any and all responsibility
for any loss, damage or destruction of data or any other property
which may arise from relying on this email's technical content is
explicitly disclaimed. The author will in no case be liable for
any monetary damages arising from such loss, damage or destruction.



On Mon, 19 Dec 2022 at 16:44, Oliver Ruebenacker
 wrote:


 Hello,

  Thank you for the response!

  I can think of two ways to get the largest city by country,
but both seem to be inefficient:

  (1) I could group by country, sort each group by population,
add the row number within each group, and then retain only
cities with a row number equal to 1. But it seems wasteful to
sort everything when I only want the largest of each country

  (2) I could group by country, get the maximum city
population for each country, join that with the original data
frame, and then retain only cities with population equal to
the maximum population in the country. But that seems also
expensive because I need to join.

  Am I missing something?

  Thanks!

 Best, Oliver

On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh
 wrote:

In spark you can use windowing function
s to
achieve this

HTH


**view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh

*Disclaimer:* Use it at your own risk.Any and all
responsibility for any loss, damage or destruction of data
or any other property which may arise from relying on this
email's technical content is explicitly disclaimed. The
author will in no case be liable for any monetary damages
arising from such loss, damage or destruction.



On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker
 wrote:


 Hello,

  How can I retain from each group only the row for
which one value is the maximum of the group? For
example, imagine a DataFrame containing all major
cities in the world, with three columns: (1) City name
(2) Country (3) population. How would I get a
DataFrame that only contains the largest city in each
country? Thanks!

 Best, Oliver

-- 
Oliver Ruebenacker, Ph.D. (he)

Senior Software Engineer, Knowledge Portal Network
, Flannick Lab
   

Re: [PySpark] Getting the best row from each group

2022-12-20 Thread Bjørn Jørgensen
https://github.com/apache/spark/pull/39134

tir. 20. des. 2022, 22:42 skrev Oliver Ruebenacker <
oliv...@broadinstitute.org>:

> Thank you for the suggestion. This would, however, involve converting my
> Dataframe to an RDD (and back later), which involves additional costs.
>
> On Tue, Dec 20, 2022 at 7:30 AM Raghavendra Ganesh <
> raghavendr...@gmail.com> wrote:
>
>> you can groupBy(country). and use mapPartitions method in which you can
>> iterate over all rows keeping 2 variables for maxPopulationSoFar and
>> corresponding city. Then return the city with max population.
>> I think as others suggested, it may be possible to use Bucketing, it
>> would give a more friendly SQL'ish way of doing and but not be the best in
>> performance as it needs to order/sort.
>> --
>> Raghavendra
>>
>>
>> On Mon, Dec 19, 2022 at 8:57 PM Oliver Ruebenacker <
>> oliv...@broadinstitute.org> wrote:
>>
>>>
>>>  Hello,
>>>
>>>   How can I retain from each group only the row for which one value is
>>> the maximum of the group? For example, imagine a DataFrame containing all
>>> major cities in the world, with three columns: (1) City name (2) Country
>>> (3) population. How would I get a DataFrame that only contains the largest
>>> city in each country? Thanks!
>>>
>>>  Best, Oliver
>>>
>>> --
>>> Oliver Ruebenacker, Ph.D. (he)
>>> Senior Software Engineer, Knowledge Portal Network , 
>>> Flannick
>>> Lab , Broad Institute
>>> 
>>>
>>
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


Re: [PySpark] Getting the best row from each group

2022-12-20 Thread Oliver Ruebenacker
Thank you for the suggestion. This would, however, involve converting my
Dataframe to an RDD (and back later), which involves additional costs.

On Tue, Dec 20, 2022 at 7:30 AM Raghavendra Ganesh 
wrote:

> you can groupBy(country). and use mapPartitions method in which you can
> iterate over all rows keeping 2 variables for maxPopulationSoFar and
> corresponding city. Then return the city with max population.
> I think as others suggested, it may be possible to use Bucketing, it would
> give a more friendly SQL'ish way of doing and but not be the best in
> performance as it needs to order/sort.
> --
> Raghavendra
>
>
> On Mon, Dec 19, 2022 at 8:57 PM Oliver Ruebenacker <
> oliv...@broadinstitute.org> wrote:
>
>>
>>  Hello,
>>
>>   How can I retain from each group only the row for which one value is
>> the maximum of the group? For example, imagine a DataFrame containing all
>> major cities in the world, with three columns: (1) City name (2) Country
>> (3) population. How would I get a DataFrame that only contains the largest
>> city in each country? Thanks!
>>
>>  Best, Oliver
>>
>> --
>> Oliver Ruebenacker, Ph.D. (he)
>> Senior Software Engineer, Knowledge Portal Network , 
>> Flannick
>> Lab , Broad Institute
>> 
>>
>

-- 
Oliver Ruebenacker, Ph.D. (he)
Senior Software Engineer, Knowledge Portal Network
, Flannick
Lab , Broad Institute



Re: [PySpark] Getting the best row from each group

2022-12-20 Thread Oliver Ruebenacker
 Hello,

  Let's say the data is like this:

+---+---++
| country   | city  | population |
+---+---++
| Germany   | Berlin| 3520031|
| Germany   | Hamburg   | 1787408|
| Germany   | Munich| 1450381|
| Turkey| Ankara| 4587558|
| Turkey| Istanbul  | 14025646   |
| Turkey| Izmir | 2847691|
| United States | Chicago, IL   | 2670406|
| United States | Los Angeles, CA   | 4085014|
| United States | New York City, NY | 8622357|
+---+---++

I want to get the largest city in each country:

+---+---+
| country   | city  |
+---+---+
| Germany   | Berlin|
| Turkey| Istanbul  |
| United States | New York City, NY |
+---+---+

Thanks!

 Best, Oliver

On Tue, Dec 20, 2022 at 5:52 AM Mich Talebzadeh 
wrote:

> Hi,
>
> Windowing functions were invented to avoid doing lengthy group by etc.
>
> As usual there is a lot of heat but little light
>
> Please provide:
>
>
>1. Sample input. I gather this data is stored in some csv, tsv, table
>format
>2. The output that you would like to see.
>
>
> Have a look at this article of mine  Technical Analysis of the latest UK
> House Price Index, Deploying Modern tools
> 
>
>
> The PySpark code and windowing functions are here
> 
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 19 Dec 2022 at 16:44, Oliver Ruebenacker <
> oliv...@broadinstitute.org> wrote:
>
>>
>>  Hello,
>>
>>   Thank you for the response!
>>
>>   I can think of two ways to get the largest city by country, but both
>> seem to be inefficient:
>>
>>   (1) I could group by country, sort each group by population, add the
>> row number within each group, and then retain only cities with a row number
>> equal to 1. But it seems wasteful to sort everything when I only want the
>> largest of each country
>>
>>   (2) I could group by country, get the maximum city population for each
>> country, join that with the original data frame, and then retain only
>> cities with population equal to the maximum population in the country. But
>> that seems also expensive because I need to join.
>>
>>   Am I missing something?
>>
>>   Thanks!
>>
>>  Best, Oliver
>>
>> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> In spark you can use windowing function
>>> s to
>>> achieve this
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
>>> oliv...@broadinstitute.org> wrote:
>>>

  Hello,

   How can I retain from each group only the row for which one value is
 the maximum of the group? For example, imagine a DataFrame containing all
 major cities in the world, with three columns: (1) City name (2) Country
 (3) population. How would I get a DataFrame that only contains the largest
 city in each country? Thanks!

  Best, Oliver

 --
 Oliver Ruebenacker, Ph.D. (he)
 Senior Software Engineer, Knowledge Portal Network ,
 Flannick Lab , Broad Institute
 

>>>
>>
>> --
>> Oliver Ruebenacker, Ph.D. (he)
>> Senior Software Engineer, Knowledge Portal Network , 
>> Flannick
>> Lab , Broad Institute
>> 
>>
>

-- 
Oliver Ruebenacker, Ph.D. (he)
Senior Software Engineer, Knowledge Portal Network

Re: [PySpark] Getting the best row from each group

2022-12-20 Thread Raghavendra Ganesh
you can groupBy(country). and use mapPartitions method in which you can
iterate over all rows keeping 2 variables for maxPopulationSoFar and
corresponding city. Then return the city with max population.
I think as others suggested, it may be possible to use Bucketing, it would
give a more friendly SQL'ish way of doing and but not be the best in
performance as it needs to order/sort.
--
Raghavendra


On Mon, Dec 19, 2022 at 8:57 PM Oliver Ruebenacker <
oliv...@broadinstitute.org> wrote:

>
>  Hello,
>
>   How can I retain from each group only the row for which one value is the
> maximum of the group? For example, imagine a DataFrame containing all major
> cities in the world, with three columns: (1) City name (2) Country (3)
> population. How would I get a DataFrame that only contains the largest city
> in each country? Thanks!
>
>  Best, Oliver
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


Re: [PySpark] Getting the best row from each group

2022-12-20 Thread Mich Talebzadeh
Hi,

Windowing functions were invented to avoid doing lengthy group by etc.

As usual there is a lot of heat but little light

Please provide:


   1. Sample input. I gather this data is stored in some csv, tsv, table
   format
   2. The output that you would like to see.


Have a look at this article of mine  Technical Analysis of the latest UK
House Price Index, Deploying Modern tools



The PySpark code and windowing functions are here



HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 19 Dec 2022 at 16:44, Oliver Ruebenacker 
wrote:

>
>  Hello,
>
>   Thank you for the response!
>
>   I can think of two ways to get the largest city by country, but both
> seem to be inefficient:
>
>   (1) I could group by country, sort each group by population, add the row
> number within each group, and then retain only cities with a row number
> equal to 1. But it seems wasteful to sort everything when I only want the
> largest of each country
>
>   (2) I could group by country, get the maximum city population for each
> country, join that with the original data frame, and then retain only
> cities with population equal to the maximum population in the country. But
> that seems also expensive because I need to join.
>
>   Am I missing something?
>
>   Thanks!
>
>  Best, Oliver
>
> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> In spark you can use windowing function
>> s to
>> achieve this
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
>> oliv...@broadinstitute.org> wrote:
>>
>>>
>>>  Hello,
>>>
>>>   How can I retain from each group only the row for which one value is
>>> the maximum of the group? For example, imagine a DataFrame containing all
>>> major cities in the world, with three columns: (1) City name (2) Country
>>> (3) population. How would I get a DataFrame that only contains the largest
>>> city in each country? Thanks!
>>>
>>>  Best, Oliver
>>>
>>> --
>>> Oliver Ruebenacker, Ph.D. (he)
>>> Senior Software Engineer, Knowledge Portal Network , 
>>> Flannick
>>> Lab , Broad Institute
>>> 
>>>
>>
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Bjørn Jørgensen
Post an example dataframe and how you will have the result.

man. 19. des. 2022 kl. 20:36 skrev Oliver Ruebenacker <
oliv...@broadinstitute.org>:

> Thank you, that is an interesting idea. Instead of finding the maximum
> population, we are finding the maximum (population, city name) tuple.
>
> On Mon, Dec 19, 2022 at 2:10 PM Bjørn Jørgensen 
> wrote:
>
>> We have pandas API on spark
>> 
>> which is very good.
>>
>> from pyspark import pandas as ps
>>
>> You can use pdf = df.pandas_api()
>> Where df is your pyspark dataframe.
>>
>>
>> [image: image.png]
>>
>> Does this help you?
>>
>> df.groupby(['Country'])[['Population', 'City']].max()
>>
>> man. 19. des. 2022 kl. 18:22 skrev Patrick Tucci > >:
>>
>>> Window functions don't work like traditional GROUP BYs. They allow you
>>> to partition data and pull any relevant column, whether it's used in the
>>> partition or not.
>>>
>>> I'm not sure what the syntax is for PySpark, but the standard SQL would
>>> be something like this:
>>>
>>> WITH InputData AS
>>> (
>>>   SELECT 'USA' Country, 'New York' City, 900 Population
>>>   UNION
>>>   SELECT 'USA' Country, 'Miami', 620 Population
>>>   UNION
>>>   SELECT 'Ukraine' Country, 'Kyiv', 300 Population
>>>   UNION
>>>   SELECT 'Ukraine' Country, 'Kharkiv', 140 Population
>>> )
>>>
>>>  SELECT *, ROW_NUMBER() OVER(PARTITION BY Country ORDER BY Population
>>> DESC) PopulationRank
>>>  FROM InputData;
>>>
>>> Results would be something like this:
>>>
>>> CountryCity   Population PopulationRank
>>> UkraineKyiv   3001
>>> UkraineKharkiv1402
>>> USANew York   9001
>>> USAMiami  6202
>>>
>>> Which you could further filter in another CTE or subquery where
>>> PopulationRank = 1.
>>>
>>> As I mentioned, I'm not sure how this translates into PySpark, but
>>> that's the general concept in SQL.
>>>
>>> On Mon, Dec 19, 2022 at 12:01 PM Oliver Ruebenacker <
>>> oliv...@broadinstitute.org> wrote:
>>>
 If we only wanted to know the biggest population, max function would
 suffice. The problem is I also want the name of the city with the biggest
 population.

 On Mon, Dec 19, 2022 at 11:58 AM Sean Owen  wrote:

> As Mich says, isn't this just max by population partitioned by country
> in a window function?
>
> On Mon, Dec 19, 2022, 9:45 AM Oliver Ruebenacker <
> oliv...@broadinstitute.org> wrote:
>
>>
>>  Hello,
>>
>>   Thank you for the response!
>>
>>   I can think of two ways to get the largest city by country, but
>> both seem to be inefficient:
>>
>>   (1) I could group by country, sort each group by population, add
>> the row number within each group, and then retain only cities with a row
>> number equal to 1. But it seems wasteful to sort everything when I only
>> want the largest of each country
>>
>>   (2) I could group by country, get the maximum city population for
>> each country, join that with the original data frame, and then retain 
>> only
>> cities with population equal to the maximum population in the country. 
>> But
>> that seems also expensive because I need to join.
>>
>>   Am I missing something?
>>
>>   Thanks!
>>
>>  Best, Oliver
>>
>> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> In spark you can use windowing function
>>> s to
>>> achieve this
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>> for any loss, damage or destruction of data or any other property which 
>>> may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary 
>>> damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
>>> oliv...@broadinstitute.org> wrote:
>>>

  Hello,

   How can I retain from each group only the row for which one value
 is the maximum of the group? For example, imagine a DataFrame 
 containing
 all major cities in the world, with three columns: (1) City name (2)
 Country (3) population. How would I get a DataFrame that only contains 
 the
 largest city in each country? Thanks!

  Best, Oliver

 --
 Oliver Ruebenacker, Ph.D. (he)
 

Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Oliver Ruebenacker
Thank you, that is an interesting idea. Instead of finding the maximum
population, we are finding the maximum (population, city name) tuple.

On Mon, Dec 19, 2022 at 2:10 PM Bjørn Jørgensen 
wrote:

> We have pandas API on spark
> 
> which is very good.
>
> from pyspark import pandas as ps
>
> You can use pdf = df.pandas_api()
> Where df is your pyspark dataframe.
>
>
> [image: image.png]
>
> Does this help you?
>
> df.groupby(['Country'])[['Population', 'City']].max()
>
> man. 19. des. 2022 kl. 18:22 skrev Patrick Tucci  >:
>
>> Window functions don't work like traditional GROUP BYs. They allow you to
>> partition data and pull any relevant column, whether it's used in the
>> partition or not.
>>
>> I'm not sure what the syntax is for PySpark, but the standard SQL would
>> be something like this:
>>
>> WITH InputData AS
>> (
>>   SELECT 'USA' Country, 'New York' City, 900 Population
>>   UNION
>>   SELECT 'USA' Country, 'Miami', 620 Population
>>   UNION
>>   SELECT 'Ukraine' Country, 'Kyiv', 300 Population
>>   UNION
>>   SELECT 'Ukraine' Country, 'Kharkiv', 140 Population
>> )
>>
>>  SELECT *, ROW_NUMBER() OVER(PARTITION BY Country ORDER BY Population
>> DESC) PopulationRank
>>  FROM InputData;
>>
>> Results would be something like this:
>>
>> CountryCity   Population PopulationRank
>> UkraineKyiv   3001
>> UkraineKharkiv1402
>> USANew York   9001
>> USAMiami  6202
>>
>> Which you could further filter in another CTE or subquery where
>> PopulationRank = 1.
>>
>> As I mentioned, I'm not sure how this translates into PySpark, but that's
>> the general concept in SQL.
>>
>> On Mon, Dec 19, 2022 at 12:01 PM Oliver Ruebenacker <
>> oliv...@broadinstitute.org> wrote:
>>
>>> If we only wanted to know the biggest population, max function would
>>> suffice. The problem is I also want the name of the city with the biggest
>>> population.
>>>
>>> On Mon, Dec 19, 2022 at 11:58 AM Sean Owen  wrote:
>>>
 As Mich says, isn't this just max by population partitioned by country
 in a window function?

 On Mon, Dec 19, 2022, 9:45 AM Oliver Ruebenacker <
 oliv...@broadinstitute.org> wrote:

>
>  Hello,
>
>   Thank you for the response!
>
>   I can think of two ways to get the largest city by country, but both
> seem to be inefficient:
>
>   (1) I could group by country, sort each group by population, add the
> row number within each group, and then retain only cities with a row 
> number
> equal to 1. But it seems wasteful to sort everything when I only want the
> largest of each country
>
>   (2) I could group by country, get the maximum city population for
> each country, join that with the original data frame, and then retain only
> cities with population equal to the maximum population in the country. But
> that seems also expensive because I need to join.
>
>   Am I missing something?
>
>   Thanks!
>
>  Best, Oliver
>
> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> In spark you can use windowing function
>> s to
>> achieve this
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>> for any loss, damage or destruction of data or any other property which 
>> may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
>> oliv...@broadinstitute.org> wrote:
>>
>>>
>>>  Hello,
>>>
>>>   How can I retain from each group only the row for which one value
>>> is the maximum of the group? For example, imagine a DataFrame containing
>>> all major cities in the world, with three columns: (1) City name (2)
>>> Country (3) population. How would I get a DataFrame that only contains 
>>> the
>>> largest city in each country? Thanks!
>>>
>>>  Best, Oliver
>>>
>>> --
>>> Oliver Ruebenacker, Ph.D. (he)
>>> Senior Software Engineer, Knowledge Portal Network
>>> , Flannick Lab , Broad
>>> Institute 
>>>
>>
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network ,

Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Bjørn Jørgensen
We have pandas API on spark

which is very good.

from pyspark import pandas as ps

You can use pdf = df.pandas_api()
Where df is your pyspark dataframe.


[image: image.png]

Does this help you?

df.groupby(['Country'])[['Population', 'City']].max()

man. 19. des. 2022 kl. 18:22 skrev Patrick Tucci :

> Window functions don't work like traditional GROUP BYs. They allow you to
> partition data and pull any relevant column, whether it's used in the
> partition or not.
>
> I'm not sure what the syntax is for PySpark, but the standard SQL would be
> something like this:
>
> WITH InputData AS
> (
>   SELECT 'USA' Country, 'New York' City, 900 Population
>   UNION
>   SELECT 'USA' Country, 'Miami', 620 Population
>   UNION
>   SELECT 'Ukraine' Country, 'Kyiv', 300 Population
>   UNION
>   SELECT 'Ukraine' Country, 'Kharkiv', 140 Population
> )
>
>  SELECT *, ROW_NUMBER() OVER(PARTITION BY Country ORDER BY Population
> DESC) PopulationRank
>  FROM InputData;
>
> Results would be something like this:
>
> CountryCity   Population PopulationRank
> UkraineKyiv   3001
> UkraineKharkiv1402
> USANew York   9001
> USAMiami  6202
>
> Which you could further filter in another CTE or subquery where
> PopulationRank = 1.
>
> As I mentioned, I'm not sure how this translates into PySpark, but that's
> the general concept in SQL.
>
> On Mon, Dec 19, 2022 at 12:01 PM Oliver Ruebenacker <
> oliv...@broadinstitute.org> wrote:
>
>> If we only wanted to know the biggest population, max function would
>> suffice. The problem is I also want the name of the city with the biggest
>> population.
>>
>> On Mon, Dec 19, 2022 at 11:58 AM Sean Owen  wrote:
>>
>>> As Mich says, isn't this just max by population partitioned by country
>>> in a window function?
>>>
>>> On Mon, Dec 19, 2022, 9:45 AM Oliver Ruebenacker <
>>> oliv...@broadinstitute.org> wrote:
>>>

  Hello,

   Thank you for the response!

   I can think of two ways to get the largest city by country, but both
 seem to be inefficient:

   (1) I could group by country, sort each group by population, add the
 row number within each group, and then retain only cities with a row number
 equal to 1. But it seems wasteful to sort everything when I only want the
 largest of each country

   (2) I could group by country, get the maximum city population for
 each country, join that with the original data frame, and then retain only
 cities with population equal to the maximum population in the country. But
 that seems also expensive because I need to join.

   Am I missing something?

   Thanks!

  Best, Oliver

 On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> In spark you can use windowing function
> s to
> achieve this
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
>
> On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
> oliv...@broadinstitute.org> wrote:
>
>>
>>  Hello,
>>
>>   How can I retain from each group only the row for which one value
>> is the maximum of the group? For example, imagine a DataFrame containing
>> all major cities in the world, with three columns: (1) City name (2)
>> Country (3) population. How would I get a DataFrame that only contains 
>> the
>> largest city in each country? Thanks!
>>
>>  Best, Oliver
>>
>> --
>> Oliver Ruebenacker, Ph.D. (he)
>> Senior Software Engineer, Knowledge Portal Network
>> , Flannick Lab , Broad
>> Institute 
>>
>

 --
 Oliver Ruebenacker, Ph.D. (he)
 Senior Software Engineer, Knowledge Portal Network ,
 Flannick Lab , Broad Institute
 

>>>
>>
>> --
>> Oliver Ruebenacker, Ph.D. (he)
>> Senior Software Engineer, Knowledge Portal Network , 
>> Flannick
>> Lab , Broad Institute
>> 
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 

Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Patrick Tucci
Window functions don't work like traditional GROUP BYs. They allow you to
partition data and pull any relevant column, whether it's used in the
partition or not.

I'm not sure what the syntax is for PySpark, but the standard SQL would be
something like this:

WITH InputData AS
(
  SELECT 'USA' Country, 'New York' City, 900 Population
  UNION
  SELECT 'USA' Country, 'Miami', 620 Population
  UNION
  SELECT 'Ukraine' Country, 'Kyiv', 300 Population
  UNION
  SELECT 'Ukraine' Country, 'Kharkiv', 140 Population
)

 SELECT *, ROW_NUMBER() OVER(PARTITION BY Country ORDER BY Population DESC)
PopulationRank
 FROM InputData;

Results would be something like this:

CountryCity   Population PopulationRank
UkraineKyiv   3001
UkraineKharkiv1402
USANew York   9001
USAMiami  6202

Which you could further filter in another CTE or subquery where
PopulationRank = 1.

As I mentioned, I'm not sure how this translates into PySpark, but that's
the general concept in SQL.

On Mon, Dec 19, 2022 at 12:01 PM Oliver Ruebenacker <
oliv...@broadinstitute.org> wrote:

> If we only wanted to know the biggest population, max function would
> suffice. The problem is I also want the name of the city with the biggest
> population.
>
> On Mon, Dec 19, 2022 at 11:58 AM Sean Owen  wrote:
>
>> As Mich says, isn't this just max by population partitioned by country in
>> a window function?
>>
>> On Mon, Dec 19, 2022, 9:45 AM Oliver Ruebenacker <
>> oliv...@broadinstitute.org> wrote:
>>
>>>
>>>  Hello,
>>>
>>>   Thank you for the response!
>>>
>>>   I can think of two ways to get the largest city by country, but both
>>> seem to be inefficient:
>>>
>>>   (1) I could group by country, sort each group by population, add the
>>> row number within each group, and then retain only cities with a row number
>>> equal to 1. But it seems wasteful to sort everything when I only want the
>>> largest of each country
>>>
>>>   (2) I could group by country, get the maximum city population for each
>>> country, join that with the original data frame, and then retain only
>>> cities with population equal to the maximum population in the country. But
>>> that seems also expensive because I need to join.
>>>
>>>   Am I missing something?
>>>
>>>   Thanks!
>>>
>>>  Best, Oliver
>>>
>>> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 In spark you can use windowing function
 s to
 achieve this

 HTH


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
 oliv...@broadinstitute.org> wrote:

>
>  Hello,
>
>   How can I retain from each group only the row for which one value is
> the maximum of the group? For example, imagine a DataFrame containing all
> major cities in the world, with three columns: (1) City name (2) Country
> (3) population. How would I get a DataFrame that only contains the largest
> city in each country? Thanks!
>
>  Best, Oliver
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network ,
> Flannick Lab , Broad Institute
> 
>

>>>
>>> --
>>> Oliver Ruebenacker, Ph.D. (he)
>>> Senior Software Engineer, Knowledge Portal Network , 
>>> Flannick
>>> Lab , Broad Institute
>>> 
>>>
>>
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Oliver Ruebenacker
If we only wanted to know the biggest population, max function would
suffice. The problem is I also want the name of the city with the biggest
population.

On Mon, Dec 19, 2022 at 11:58 AM Sean Owen  wrote:

> As Mich says, isn't this just max by population partitioned by country in
> a window function?
>
> On Mon, Dec 19, 2022, 9:45 AM Oliver Ruebenacker <
> oliv...@broadinstitute.org> wrote:
>
>>
>>  Hello,
>>
>>   Thank you for the response!
>>
>>   I can think of two ways to get the largest city by country, but both
>> seem to be inefficient:
>>
>>   (1) I could group by country, sort each group by population, add the
>> row number within each group, and then retain only cities with a row number
>> equal to 1. But it seems wasteful to sort everything when I only want the
>> largest of each country
>>
>>   (2) I could group by country, get the maximum city population for each
>> country, join that with the original data frame, and then retain only
>> cities with population equal to the maximum population in the country. But
>> that seems also expensive because I need to join.
>>
>>   Am I missing something?
>>
>>   Thanks!
>>
>>  Best, Oliver
>>
>> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> In spark you can use windowing function
>>> s to
>>> achieve this
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
>>> oliv...@broadinstitute.org> wrote:
>>>

  Hello,

   How can I retain from each group only the row for which one value is
 the maximum of the group? For example, imagine a DataFrame containing all
 major cities in the world, with three columns: (1) City name (2) Country
 (3) population. How would I get a DataFrame that only contains the largest
 city in each country? Thanks!

  Best, Oliver

 --
 Oliver Ruebenacker, Ph.D. (he)
 Senior Software Engineer, Knowledge Portal Network ,
 Flannick Lab , Broad Institute
 

>>>
>>
>> --
>> Oliver Ruebenacker, Ph.D. (he)
>> Senior Software Engineer, Knowledge Portal Network , 
>> Flannick
>> Lab , Broad Institute
>> 
>>
>

-- 
Oliver Ruebenacker, Ph.D. (he)
Senior Software Engineer, Knowledge Portal Network
, Flannick
Lab , Broad Institute



Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Sean Owen
As Mich says, isn't this just max by population partitioned by country in a
window function?

On Mon, Dec 19, 2022, 9:45 AM Oliver Ruebenacker 
wrote:

>
>  Hello,
>
>   Thank you for the response!
>
>   I can think of two ways to get the largest city by country, but both
> seem to be inefficient:
>
>   (1) I could group by country, sort each group by population, add the row
> number within each group, and then retain only cities with a row number
> equal to 1. But it seems wasteful to sort everything when I only want the
> largest of each country
>
>   (2) I could group by country, get the maximum city population for each
> country, join that with the original data frame, and then retain only
> cities with population equal to the maximum population in the country. But
> that seems also expensive because I need to join.
>
>   Am I missing something?
>
>   Thanks!
>
>  Best, Oliver
>
> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> In spark you can use windowing function
>> s to
>> achieve this
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
>> oliv...@broadinstitute.org> wrote:
>>
>>>
>>>  Hello,
>>>
>>>   How can I retain from each group only the row for which one value is
>>> the maximum of the group? For example, imagine a DataFrame containing all
>>> major cities in the world, with three columns: (1) City name (2) Country
>>> (3) population. How would I get a DataFrame that only contains the largest
>>> city in each country? Thanks!
>>>
>>>  Best, Oliver
>>>
>>> --
>>> Oliver Ruebenacker, Ph.D. (he)
>>> Senior Software Engineer, Knowledge Portal Network , 
>>> Flannick
>>> Lab , Broad Institute
>>> 
>>>
>>
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Oliver Ruebenacker
 Hello,

  Thank you for the response!

  I can think of two ways to get the largest city by country, but both seem
to be inefficient:

  (1) I could group by country, sort each group by population, add the row
number within each group, and then retain only cities with a row number
equal to 1. But it seems wasteful to sort everything when I only want the
largest of each country

  (2) I could group by country, get the maximum city population for each
country, join that with the original data frame, and then retain only
cities with population equal to the maximum population in the country. But
that seems also expensive because I need to join.

  Am I missing something?

  Thanks!

 Best, Oliver

On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh 
wrote:

> In spark you can use windowing function
> s to
> achieve this
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
> oliv...@broadinstitute.org> wrote:
>
>>
>>  Hello,
>>
>>   How can I retain from each group only the row for which one value is
>> the maximum of the group? For example, imagine a DataFrame containing all
>> major cities in the world, with three columns: (1) City name (2) Country
>> (3) population. How would I get a DataFrame that only contains the largest
>> city in each country? Thanks!
>>
>>  Best, Oliver
>>
>> --
>> Oliver Ruebenacker, Ph.D. (he)
>> Senior Software Engineer, Knowledge Portal Network , 
>> Flannick
>> Lab , Broad Institute
>> 
>>
>

-- 
Oliver Ruebenacker, Ph.D. (he)
Senior Software Engineer, Knowledge Portal Network
, Flannick
Lab , Broad Institute



Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Mich Talebzadeh
In spark you can use windowing function
s to
achieve this

HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker 
wrote:

>
>  Hello,
>
>   How can I retain from each group only the row for which one value is the
> maximum of the group? For example, imagine a DataFrame containing all major
> cities in the world, with three columns: (1) City name (2) Country (3)
> population. How would I get a DataFrame that only contains the largest city
> in each country? Thanks!
>
>  Best, Oliver
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


Re: Issues getting Apache Spark

2022-05-26 Thread Apostolos N. Papadopoulos
How can we help if we do not know what is the problem? What is the error 
you are getting, at which step?


Please give us more info to be able to help you. Spark installation on 
Linux/Windows is easy if you follow exactly the


guidelines.

Regards,

Apostolos


On 26/5/22 22:19, Martin, Michael wrote:


Hello,

I’m writing to request assistance in getting Apache Spark on my 
laptop. I’ve followed instructions telling me to get Java, Python, 
Hadoop, Winutils, and Spark itself. I’ve followed instructions 
illustrating how to set my environment variables. For some reason, I 
still cannot get Spark to work on my laptop.


Michael Martin


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email:papad...@csd.auth.gr
twitter: @papadopoulos_ap
web:http://datalab.csd.auth.gr/~apostol


Re: Message getting lost in Kafka + Spark Streaming

2017-06-01 Thread Vikash Pareek
Thanks Sidney for your response,

To check if all the messages are processed I used accumulator and also add
a print statement for debuging.


*val accum = ssc.sparkContext.accumulator(0, "Debug Accumulator")*
*...*
*...*
*...*
*val mappedDataStream = dataStream.map(_._2);*
*  mappedDataStream.foreachRDD { rdd =>*
*...*
*...*
*...*
*partition.foreach { row =>*
*  if (debug) println(row.mkString)*
*  val keyedMessage = new KeyedMessage[String,
String](props.getProperty("outTopicUnharmonized"),*
*null, row.toString())*
*  producer.send(keyedMessage)*
*  println("Messges sent to Kafka: " + keyedMessage.message)*
*  accum += 1*
*}*
*//hack, should be done with the flush*
*Thread.sleep(1000)*
*producer.close()*
*print("Accumulator's value is: " + accum)*

And I am getting all the messages in "*println("Messges sent to Kafka: " +
keyedMessage.message)*" received by the stream, and accumulator value is
also same as number of incoming messages.



Best Regards,


[image: InfoObjects Inc.] 
Vikash Pareek
Team Lead  *InfoObjects Inc.*
Big Data Analytics

m: +91 8800206898 a: E5, Jhalana Institutionall Area, Jaipur, Rajasthan
302004
w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com




On Thu, Jun 1, 2017 at 11:24 AM, Sidney Feiner 
wrote:

> Are you sure that every message gets processed? It could be that some
> messages failed passing the decoder.
> And during the processing, are you maybe putting the events into a map?
> That way, events with the same key could override each other and that way
> you'll have less final events.
>
> -Original Message-
> From: Vikash Pareek [mailto:vikash.par...@infoobjects.com]
> Sent: Tuesday, May 30, 2017 4:00 PM
> To: user@spark.apache.org
> Subject: Message getting lost in Kafka + Spark Streaming
>
> I am facing an issue related to spark streaming with kafka, my use case is
> as
> follow:
> 1. Spark streaming(DirectStream) application reading data/messages from
> kafka topic and process it 2. On the basis of proccessed message, app will
> write proccessed message to different kafka topics for e.g. if messgese is
> harmonized then write to harmonized topic else unharmonized topic
>
> the problem is that during the streaming somehow we are lossing some
> messaged i.e all the incoming messages are not written to harmonized or
> unharmonized topics.
> for e.g. if app received 30 messages in one batch then sometime it write
> all the messges to output topics(this is expected behaviour) but sometimes
> it writes only 27 (3 messages are lost, this number can change).
>
> Versions as follow:
> Spark 1.6.0
> Kafka 0.9
>
> Kafka topics confguration is as follow:
> # of brokers: 3
> # replicxation factor: 3
> # of paritions: 3
>
> Following are the properties we are using for kafka:
> *  val props = new Properties()
>   props.put("metadata.broker.list",
> properties.getProperty("metadataBrokerList"))
>   props.put("auto.offset.reset",
> properties.getProperty("autoOffsetReset"))
>   props.put("group.id", properties.getProperty("group.id"))
>   props.put("serializer.class", "kafka.serializer.StringEncoder")
>   props.put("outTopicHarmonized",
> properties.getProperty("outletKafkaTopicHarmonized"))
>   props.put("outTopicUnharmonized",
> properties.getProperty("outletKafkaTopicUnharmonized"))
>   props.put("acks", "all");
>   props.put("retries", "5");
>   props.put("request.required.acks", "-1")
> *
> Following is the piece of code where we are writing proccessed messges to
> kafka:
> *  val schemaRdd2 = finalHarmonizedDF.toJSON
>
>   schemaRdd2.foreachPartition { partition =>
> val producerConfig = new ProducerConfig(props)
> val producer = new Producer[String, String](producerConfig)
>
> partition.foreach { row =>
>   if (debug) println(row.mkString)
>   val keyedMessage = new KeyedMessage[String,
> String](props.getProperty("outTopicHarmonized"),
> null, row.toString())
>   producer.send(keyedMessage)
>
> }
> //hack, should be done with the flush
> Thread.sleep(1000)
> producer.close()
>   }
> *
> We explicitely added sleep(1000) for testing purpose.
> But this is also not solving the problem :(
>
> Any suggestion would be appreciated.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-
> Streaming-tp28719.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


RE: Message getting lost in Kafka + Spark Streaming

2017-05-31 Thread Sidney Feiner
Are you sure that every message gets processed? It could be that some messages 
failed passing the decoder.
And during the processing, are you maybe putting the events into a map? That 
way, events with the same key could override each other and that way you'll 
have less final events.

-Original Message-
From: Vikash Pareek [mailto:vikash.par...@infoobjects.com] 
Sent: Tuesday, May 30, 2017 4:00 PM
To: user@spark.apache.org
Subject: Message getting lost in Kafka + Spark Streaming

I am facing an issue related to spark streaming with kafka, my use case is as
follow:
1. Spark streaming(DirectStream) application reading data/messages from kafka 
topic and process it 2. On the basis of proccessed message, app will write 
proccessed message to different kafka topics for e.g. if messgese is harmonized 
then write to harmonized topic else unharmonized topic
 
the problem is that during the streaming somehow we are lossing some messaged 
i.e all the incoming messages are not written to harmonized or unharmonized 
topics.
for e.g. if app received 30 messages in one batch then sometime it write all 
the messges to output topics(this is expected behaviour) but sometimes it 
writes only 27 (3 messages are lost, this number can change).
 
Versions as follow:
Spark 1.6.0
Kafka 0.9
 
Kafka topics confguration is as follow:
# of brokers: 3
# replicxation factor: 3
# of paritions: 3
 
Following are the properties we are using for kafka:
*  val props = new Properties()
  props.put("metadata.broker.list",
properties.getProperty("metadataBrokerList"))
  props.put("auto.offset.reset",
properties.getProperty("autoOffsetReset"))
  props.put("group.id", properties.getProperty("group.id"))
  props.put("serializer.class", "kafka.serializer.StringEncoder")
  props.put("outTopicHarmonized",
properties.getProperty("outletKafkaTopicHarmonized"))
  props.put("outTopicUnharmonized",
properties.getProperty("outletKafkaTopicUnharmonized"))
  props.put("acks", "all");
  props.put("retries", "5");
  props.put("request.required.acks", "-1")
*
Following is the piece of code where we are writing proccessed messges to
kafka:
*  val schemaRdd2 = finalHarmonizedDF.toJSON
 
  schemaRdd2.foreachPartition { partition =>
val producerConfig = new ProducerConfig(props)
val producer = new Producer[String, String](producerConfig)
 
partition.foreach { row =>
  if (debug) println(row.mkString)
  val keyedMessage = new KeyedMessage[String, 
String](props.getProperty("outTopicHarmonized"),
null, row.toString())
  producer.send(keyedMessage)
 
}
//hack, should be done with the flush
Thread.sleep(1000)
producer.close()
  }
*
We explicitely added sleep(1000) for testing purpose.
But this is also not solving the problem :(
 
Any suggestion would be appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Message getting lost in Kafka + Spark Streaming

2017-05-30 Thread Cody Koeninger
First thing I noticed, you should be using a singleton kafka producer,
not recreating one every partition.  It's threadsafe.

On Tue, May 30, 2017 at 7:59 AM, Vikash Pareek
 wrote:
> I am facing an issue related to spark streaming with kafka, my use case is as
> follow:
> 1. Spark streaming(DirectStream) application reading data/messages from
> kafka topic and process it
> 2. On the basis of proccessed message, app will write proccessed message to
> different kafka topics
> for e.g. if messgese is harmonized then write to harmonized topic else
> unharmonized topic
>
> the problem is that during the streaming somehow we are lossing some
> messaged i.e all the incoming messages are not written to harmonized or
> unharmonized topics.
> for e.g. if app received 30 messages in one batch then sometime it write all
> the messges to output topics(this is expected behaviour) but sometimes it
> writes only 27 (3 messages are lost, this number can change).
>
> Versions as follow:
> Spark 1.6.0
> Kafka 0.9
>
> Kafka topics confguration is as follow:
> # of brokers: 3
> # replicxation factor: 3
> # of paritions: 3
>
> Following are the properties we are using for kafka:
> *  val props = new Properties()
>   props.put("metadata.broker.list",
> properties.getProperty("metadataBrokerList"))
>   props.put("auto.offset.reset",
> properties.getProperty("autoOffsetReset"))
>   props.put("group.id", properties.getProperty("group.id"))
>   props.put("serializer.class", "kafka.serializer.StringEncoder")
>   props.put("outTopicHarmonized",
> properties.getProperty("outletKafkaTopicHarmonized"))
>   props.put("outTopicUnharmonized",
> properties.getProperty("outletKafkaTopicUnharmonized"))
>   props.put("acks", "all");
>   props.put("retries", "5");
>   props.put("request.required.acks", "-1")
> *
> Following is the piece of code where we are writing proccessed messges to
> kafka:
> *  val schemaRdd2 = finalHarmonizedDF.toJSON
>
>   schemaRdd2.foreachPartition { partition =>
> val producerConfig = new ProducerConfig(props)
> val producer = new Producer[String, String](producerConfig)
>
> partition.foreach { row =>
>   if (debug) println(row.mkString)
>   val keyedMessage = new KeyedMessage[String,
> String](props.getProperty("outTopicHarmonized"),
> null, row.toString())
>   producer.send(keyedMessage)
>
> }
> //hack, should be done with the flush
> Thread.sleep(1000)
> producer.close()
>   }
> *
> We explicitely added sleep(1000) for testing purpose.
> But this is also not solving the problem :(
>
> Any suggestion would be appreciated.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Getting data from MongoDB in JAVA

2016-06-13 Thread Asfandyar Ashraf Malik
Yes, It was a dependency issue. I was using incompatible versions.
The command *mvn dependency:tree -Dverbose *helped me fix this.

Cheers



Asfandyar Ashraf Malik


Mobile: +49 15751174449 <%2B49%20151%20230%20130%2066>

Email: asfand...@kreditech.com <%2B49%20151%20230%20130%2066>



Kreditech Holding SSL GmbH

Ludwig-Erhard-Straße 1, 20459 Hamburg, Germany

2016-06-12 18:36 GMT+02:00 Ted Yu :

> What's the value of spark.version ?
>
> Do you know which version of Spark mongodb connector 0.10.3 was built
> against ?
>
> You can use the following command to find out:
> mvn dependency:tree
>
> Maybe the Spark version you use is different from what mongodb connector
> was built against.
>
> On Fri, Jun 10, 2016 at 2:50 AM, Asfandyar Ashraf Malik <
> asfand...@kreditech.com> wrote:
>
>> Hi,
>> I did not notice that I put it twice.
>> I changed that and ran my program but it still gives the same error:
>>
>> java.lang.NoSuchMethodError:
>> org.apache.spark.sql.catalyst.ScalaReflection$.typeOfObject()Lscala/PartialFunction;
>>
>>
>> Cheers
>>
>>
>>
>> 2016-06-10 11:47 GMT+02:00 Alonso Isidoro Roman :
>>
>>> why *spark-mongodb_2.11 dependency is written twice in pom.xml?*
>>>
>>> Alonso Isidoro Roman
>>> [image: https://]about.me/alonso.isidoro.roman
>>>
>>> 
>>>
>>> 2016-06-10 11:39 GMT+02:00 Asfandyar Ashraf Malik <
>>> asfand...@kreditech.com>:
>>>
 Hi,
 I am using Stratio library to get MongoDB to work with Spark but I get
 the following error:

 java.lang.NoSuchMethodError:
 org.apache.spark.sql.catalyst.ScalaReflection

 This is my code.

 ---
 *public static void main(String[] args) {*

 *JavaSparkContext sc = new JavaSparkContext("local[*]", "test
 spark-mongodb java"); *
 *SQLContext sqlContext = new SQLContext(sc); *

 *Map options = new HashMap(); *
 *options.put("host", "xyz.mongolab.com:59107
 "); *
 *options.put("database", "heroku_app3525385");*
 *options.put("collection", "datalog");*
 *options.put("credentials", "*,,");*

 *DataFrame df =
 sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();*
 *df.registerTempTable("datalog"); *
 *df.show();*

 *}*

 ---
 My pom file is as follows:

  **
 **
 *org.apache.spark*
 *spark-core_2.11*
 *${spark.version}*
 **
 **
 *org.apache.spark*
 *spark-catalyst_2.11 *
 *${spark.version}*
 **
 **
 *org.apache.spark*
 *spark-sql_2.11*
 *${spark.version}*
 * *
 **
 *com.stratio.datasource*
 *spark-mongodb_2.11*
 *0.10.3*
 **
 **
 *com.stratio.datasource*
 *spark-mongodb_2.11*
 *0.10.3*
 *jar*
 **
 **


 Regards

>>>
>>>
>>
>


Re: Spark Getting data from MongoDB in JAVA

2016-06-12 Thread Ted Yu
What's the value of spark.version ?

Do you know which version of Spark mongodb connector 0.10.3 was built
against ?

You can use the following command to find out:
mvn dependency:tree

Maybe the Spark version you use is different from what mongodb connector
was built against.

On Fri, Jun 10, 2016 at 2:50 AM, Asfandyar Ashraf Malik <
asfand...@kreditech.com> wrote:

> Hi,
> I did not notice that I put it twice.
> I changed that and ran my program but it still gives the same error:
>
> java.lang.NoSuchMethodError:
> org.apache.spark.sql.catalyst.ScalaReflection$.typeOfObject()Lscala/PartialFunction;
>
>
> Cheers
>
>
>
> 2016-06-10 11:47 GMT+02:00 Alonso Isidoro Roman :
>
>> why *spark-mongodb_2.11 dependency is written twice in pom.xml?*
>>
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> 
>>
>> 2016-06-10 11:39 GMT+02:00 Asfandyar Ashraf Malik <
>> asfand...@kreditech.com>:
>>
>>> Hi,
>>> I am using Stratio library to get MongoDB to work with Spark but I get
>>> the following error:
>>>
>>> java.lang.NoSuchMethodError:
>>> org.apache.spark.sql.catalyst.ScalaReflection
>>>
>>> This is my code.
>>>
>>> ---
>>> *public static void main(String[] args) {*
>>>
>>> *JavaSparkContext sc = new JavaSparkContext("local[*]", "test
>>> spark-mongodb java"); *
>>> *SQLContext sqlContext = new SQLContext(sc); *
>>>
>>> *Map options = new HashMap(); *
>>> *options.put("host", "xyz.mongolab.com:59107
>>> "); *
>>> *options.put("database", "heroku_app3525385");*
>>> *options.put("collection", "datalog");*
>>> *options.put("credentials", "*,,");*
>>>
>>> *DataFrame df =
>>> sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();*
>>> *df.registerTempTable("datalog"); *
>>> *df.show();*
>>>
>>> *}*
>>>
>>> ---
>>> My pom file is as follows:
>>>
>>>  **
>>> **
>>> *org.apache.spark*
>>> *spark-core_2.11*
>>> *${spark.version}*
>>> **
>>> **
>>> *org.apache.spark*
>>> *spark-catalyst_2.11 *
>>> *${spark.version}*
>>> **
>>> **
>>> *org.apache.spark*
>>> *spark-sql_2.11*
>>> *${spark.version}*
>>> * *
>>> **
>>> *com.stratio.datasource*
>>> *spark-mongodb_2.11*
>>> *0.10.3*
>>> **
>>> **
>>> *com.stratio.datasource*
>>> *spark-mongodb_2.11*
>>> *0.10.3*
>>> *jar*
>>> **
>>> **
>>>
>>>
>>> Regards
>>>
>>
>>
>


Re: Spark Getting data from MongoDB in JAVA

2016-06-12 Thread vaquar khan
Hi Asfanyar,

*NoSuchMethodError *in Java means you compiled against one version of code
, and executed against a different version.

Please make sure your java version and adding dependency version is working
on same java version.

regards,
vaquar khan

On Fri, Jun 10, 2016 at 4:50 AM, Asfandyar Ashraf Malik <
asfand...@kreditech.com> wrote:

> Hi,
> I did not notice that I put it twice.
> I changed that and ran my program but it still gives the same error:
>
> java.lang.NoSuchMethodError:
> org.apache.spark.sql.catalyst.ScalaReflection$.typeOfObject()Lscala/PartialFunction;
>
>
> Cheers
>
>
>
> 2016-06-10 11:47 GMT+02:00 Alonso Isidoro Roman :
>
>> why *spark-mongodb_2.11 dependency is written twice in pom.xml?*
>>
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> 
>>
>> 2016-06-10 11:39 GMT+02:00 Asfandyar Ashraf Malik <
>> asfand...@kreditech.com>:
>>
>>> Hi,
>>> I am using Stratio library to get MongoDB to work with Spark but I get
>>> the following error:
>>>
>>> java.lang.NoSuchMethodError:
>>> org.apache.spark.sql.catalyst.ScalaReflection
>>>
>>> This is my code.
>>>
>>> ---
>>> *public static void main(String[] args) {*
>>>
>>> *JavaSparkContext sc = new JavaSparkContext("local[*]", "test
>>> spark-mongodb java"); *
>>> *SQLContext sqlContext = new SQLContext(sc); *
>>>
>>> *Map options = new HashMap(); *
>>> *options.put("host", "xyz.mongolab.com:59107
>>> "); *
>>> *options.put("database", "heroku_app3525385");*
>>> *options.put("collection", "datalog");*
>>> *options.put("credentials", "*,,");*
>>>
>>> *DataFrame df =
>>> sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();*
>>> *df.registerTempTable("datalog"); *
>>> *df.show();*
>>>
>>> *}*
>>>
>>> ---
>>> My pom file is as follows:
>>>
>>>  **
>>> **
>>> *org.apache.spark*
>>> *spark-core_2.11*
>>> *${spark.version}*
>>> **
>>> **
>>> *org.apache.spark*
>>> *spark-catalyst_2.11 *
>>> *${spark.version}*
>>> **
>>> **
>>> *org.apache.spark*
>>> *spark-sql_2.11*
>>> *${spark.version}*
>>> * *
>>> **
>>> *com.stratio.datasource*
>>> *spark-mongodb_2.11*
>>> *0.10.3*
>>> **
>>> **
>>> *com.stratio.datasource*
>>> *spark-mongodb_2.11*
>>> *0.10.3*
>>> *jar*
>>> **
>>> **
>>>
>>>
>>> Regards
>>>
>>
>>
>


-- 
Regards,
Vaquar Khan
+91 830-851-1500


Re: Spark Getting data from MongoDB in JAVA

2016-06-10 Thread Asfandyar Ashraf Malik
Hi,
I did not notice that I put it twice.
I changed that and ran my program but it still gives the same error:

java.lang.NoSuchMethodError:
org.apache.spark.sql.catalyst.ScalaReflection$.typeOfObject()Lscala/PartialFunction;


Cheers



2016-06-10 11:47 GMT+02:00 Alonso Isidoro Roman :

> why *spark-mongodb_2.11 dependency is written twice in pom.xml?*
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> 
>
> 2016-06-10 11:39 GMT+02:00 Asfandyar Ashraf Malik  >:
>
>> Hi,
>> I am using Stratio library to get MongoDB to work with Spark but I get
>> the following error:
>>
>> java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.ScalaReflection
>>
>> This is my code.
>>
>> ---
>> *public static void main(String[] args) {*
>>
>> *JavaSparkContext sc = new JavaSparkContext("local[*]", "test
>> spark-mongodb java"); *
>> *SQLContext sqlContext = new SQLContext(sc); *
>>
>> *Map options = new HashMap(); *
>> *options.put("host", "xyz.mongolab.com:59107
>> "); *
>> *options.put("database", "heroku_app3525385");*
>> *options.put("collection", "datalog");*
>> *options.put("credentials", "*,,");*
>>
>> *DataFrame df =
>> sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();*
>> *df.registerTempTable("datalog"); *
>> *df.show();*
>>
>> *}*
>>
>> ---
>> My pom file is as follows:
>>
>>  **
>> **
>> *org.apache.spark*
>> *spark-core_2.11*
>> *${spark.version}*
>> **
>> **
>> *org.apache.spark*
>> *spark-catalyst_2.11 *
>> *${spark.version}*
>> **
>> **
>> *org.apache.spark*
>> *spark-sql_2.11*
>> *${spark.version}*
>> * *
>> **
>> *com.stratio.datasource*
>> *spark-mongodb_2.11*
>> *0.10.3*
>> **
>> **
>> *com.stratio.datasource*
>> *spark-mongodb_2.11*
>> *0.10.3*
>> *jar*
>> **
>> **
>>
>>
>> Regards
>>
>
>


Re: Spark Getting data from MongoDB in JAVA

2016-06-10 Thread Alonso Isidoro Roman
why *spark-mongodb_2.11 dependency is written twice in pom.xml?*

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2016-06-10 11:39 GMT+02:00 Asfandyar Ashraf Malik :

> Hi,
> I am using Stratio library to get MongoDB to work with Spark but I get the
> following error:
>
> java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.ScalaReflection
>
> This is my code.
>
> ---
> *public static void main(String[] args) {*
>
> *JavaSparkContext sc = new JavaSparkContext("local[*]", "test
> spark-mongodb java"); *
> *SQLContext sqlContext = new SQLContext(sc); *
>
> *Map options = new HashMap(); *
> *options.put("host", "xyz.mongolab.com:59107
> "); *
> *options.put("database", "heroku_app3525385");*
> *options.put("collection", "datalog");*
> *options.put("credentials", "*,,");*
>
> *DataFrame df =
> sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();*
> *df.registerTempTable("datalog"); *
> *df.show();*
>
> *}*
>
> ---
> My pom file is as follows:
>
>  **
> **
> *org.apache.spark*
> *spark-core_2.11*
> *${spark.version}*
> **
> **
> *org.apache.spark*
> *spark-catalyst_2.11 *
> *${spark.version}*
> **
> **
> *org.apache.spark*
> *spark-sql_2.11*
> *${spark.version}*
> * *
> **
> *com.stratio.datasource*
> *spark-mongodb_2.11*
> *0.10.3*
> **
> **
> *com.stratio.datasource*
> *spark-mongodb_2.11*
> *0.10.3*
> *jar*
> **
> **
>
>
> Regards
>


Re: Error getting response from spark driver rest APIs : java.lang.IncompatibleClassChangeError: Implementing class

2015-12-26 Thread Hokam Singh Chauhan
Hi Rakesh,

Looks like the old version of jersey is going with shaded jar.
Add the below dependencies in your shaded jar, It will resolve the
*InvocationTargetException
*issue.

jersey-client-1.9
jersey-core-1.9
jersey-json-1.9
jersey-grizzly2-1.9
jersey-guice-1.9
jersey-server-1.9

Regards,
Hokam

On Thu, Dec 17, 2015 at 10:09 AM, ihavethepotential <
ihavethepotent...@gmail.com> wrote:

> Hi all,
>
> I am trying to get the job details for my spark application using a REST
> call to the driver API. I am making a GET request to the following URI
>
> /api/v1/applications/socketEs2/jobs
>
> But getting the following exception:
>
> 015-12-16 19:46:28 qtp1912556493-56 [WARN ] ServletHandler -
> /api/v1/applications/socketEs2/jobs
> java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
>
> com.sun.jersey.spi.container.servlet.WebComponent.createResourceConfig(WebComponent.java:728)
> at
>
> com.sun.jersey.spi.container.servlet.WebComponent.createResourceConfig(WebComponent.java:678)
> at
>
> com.sun.jersey.spi.container.servlet.WebComponent.init(WebComponent.java:203)
> at
>
> com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:373)
> at
>
> com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:556)
> at javax.servlet.GenericServlet.init(GenericServlet.java:244)
> at
>
> org.spark-project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:532)
> at
>
> org.spark-project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:415)
> at
>
> org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:657)
> at
>
> org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
> at
>
> org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
> at
>
> org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
> at
>
> org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
> at
>
> org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
> at
>
> org.spark-project.jetty.server.handler.GzipHandler.handle(GzipHandler.java:301)
> at
>
> org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
> at
>
> org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
> at org.spark-project.jetty.server.Server.handle(Server.java:370)
> at
>
> org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
> at
>
> org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
> at
>
> org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
> at
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
> at
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
> at
>
> org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
> at
>
> org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
> at
>
> org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
> at
>
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
> at
>
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IncompatibleClassChangeError: Implementing class
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
> at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at 

Re: Master getting down with Memory issue.

2015-09-28 Thread Akhil Das
Depends on the data volume that you are operating on.

Thanks
Best Regards

On Mon, Sep 28, 2015 at 5:12 PM, Saurav Sinha 
wrote:

> Hi Akhil,
>
> My job is creating 47 stages in one cycle and it is running every hour.
> Can you please suggest me what is optimum numbers of stages in spark job.
>
> How can we reduce numbers of stages in spark job.
>
> Thanks,
> Saurav Sinha
>
> On Mon, Sep 28, 2015 at 3:23 PM, Saurav Sinha 
> wrote:
>
>> Hi Akhil,
>>
>> Can you please explaine to me how increasing number of partition (which
>> is thing is worker nodes) will help.
>>
>> As issue is that my master is getting OOM.
>>
>> Thanks,
>> Saurav Sinha
>>
>> On Mon, Sep 28, 2015 at 2:32 PM, Akhil Das 
>> wrote:
>>
>>> This behavior totally depends on the job that you are doing. Usually
>>> increasing the # of partitions will sort out this issue. It would be good
>>> if you can paste the code snippet or explain what type of operations that
>>> you are doing.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Sep 28, 2015 at 11:37 AM, Saurav Sinha 
>>> wrote:
>>>
 Hi Spark Users,

 I am running some spark jobs which is running every hour.After running
 for 12 hours master is getting killed giving exception as

 *java.lang.OutOfMemoryError: GC overhead limit exceeded*

 It look like there is some memory issue in spark master.
 Spark Master is blocker. Any one please suggest me any thing.


 Same kind of issue I noticed with spark history server.

 In my job I have to monitor if job completed successfully, for that I
 am hitting curl to get status but when no of jobs has increased to >80 apps
 history server start responding with delay.Like it is taking more then 5
 min to respond status of jobs.

 Running spark 1.4.1 in standalone mode on 5 machine cluster.

 Kindly suggest me solution for memory issue it is blocker.

 Thanks,
 Saurav Sinha

 --
 Thanks and Regards,

 Saurav Sinha

 Contact: 9742879062

>>>
>>>
>>
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>
>
>
> --
> Thanks and Regards,
>
> Saurav Sinha
>
> Contact: 9742879062
>


Re: Master getting down with Memory issue.

2015-09-28 Thread Saurav Sinha
Hi Akhil,

Can you please explaine to me how increasing number of partition (which is
thing is worker nodes) will help.

As issue is that my master is getting OOM.

Thanks,
Saurav Sinha

On Mon, Sep 28, 2015 at 2:32 PM, Akhil Das 
wrote:

> This behavior totally depends on the job that you are doing. Usually
> increasing the # of partitions will sort out this issue. It would be good
> if you can paste the code snippet or explain what type of operations that
> you are doing.
>
> Thanks
> Best Regards
>
> On Mon, Sep 28, 2015 at 11:37 AM, Saurav Sinha 
> wrote:
>
>> Hi Spark Users,
>>
>> I am running some spark jobs which is running every hour.After running
>> for 12 hours master is getting killed giving exception as
>>
>> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>
>> It look like there is some memory issue in spark master.
>> Spark Master is blocker. Any one please suggest me any thing.
>>
>>
>> Same kind of issue I noticed with spark history server.
>>
>> In my job I have to monitor if job completed successfully, for that I am
>> hitting curl to get status but when no of jobs has increased to >80 apps
>> history server start responding with delay.Like it is taking more then 5
>> min to respond status of jobs.
>>
>> Running spark 1.4.1 in standalone mode on 5 machine cluster.
>>
>> Kindly suggest me solution for memory issue it is blocker.
>>
>> Thanks,
>> Saurav Sinha
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>
>


-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Re: Master getting down with Memory issue.

2015-09-28 Thread Akhil Das
This behavior totally depends on the job that you are doing. Usually
increasing the # of partitions will sort out this issue. It would be good
if you can paste the code snippet or explain what type of operations that
you are doing.

Thanks
Best Regards

On Mon, Sep 28, 2015 at 11:37 AM, Saurav Sinha 
wrote:

> Hi Spark Users,
>
> I am running some spark jobs which is running every hour.After running for
> 12 hours master is getting killed giving exception as
>
> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>
> It look like there is some memory issue in spark master.
> Spark Master is blocker. Any one please suggest me any thing.
>
>
> Same kind of issue I noticed with spark history server.
>
> In my job I have to monitor if job completed successfully, for that I am
> hitting curl to get status but when no of jobs has increased to >80 apps
> history server start responding with delay.Like it is taking more then 5
> min to respond status of jobs.
>
> Running spark 1.4.1 in standalone mode on 5 machine cluster.
>
> Kindly suggest me solution for memory issue it is blocker.
>
> Thanks,
> Saurav Sinha
>
> --
> Thanks and Regards,
>
> Saurav Sinha
>
> Contact: 9742879062
>


Re: Help getting started with Kafka

2015-09-22 Thread Yana Kadiyska
Thanks a lot Cody! I was punting on the decoders by calling count (or
trying to, since my types require a custom decoder) but your sample code is
exactly what I was trying to achieve. The error message threw me off, will
work on the decoders now

On Tue, Sep 22, 2015 at 10:50 AM, Cody Koeninger  wrote:

> You need type parameters for the call to createRDD indicating the type of
> the key / value and the decoder to use for each.
>
> See
>
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/BasicRDD.scala
>
> Also, you need to check to see if offsets 0 through 100 are still actually
> present in the kafka logs.
>
> On Tue, Sep 22, 2015 at 9:38 AM, Yana Kadiyska 
> wrote:
>
>> Hi folks, I'm trying to write a simple Spark job that dumps out a Kafka
>> queue into HDFS. Being very new to Kafka, not sure if I'm messing something
>> up on that side...My hope is to read the messages presently in the queue
>> (or at least the first 100 for now)
>>
>> Here is what I have:
>> Kafka side:
>>
>>  ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ingress 
>> --broker-list IP1:9092,IP2:9092,IP3:9092 --time -1
>> ingress:0:34386
>> ingress:1:34148
>> ingress:2:34300
>>
>> ​
>>
>> On Spark side I'm trying this(1.4.1):
>>
>> bin/spark-shell --jars
>> kafka-clients-0.8.2.0.jar,spark-streaming-kafka_2.10-1.4.1.jar,kafka_2.10-0.8.2.0.jar,metrics-core-2.2.0.ja
>>
>>
>>
>> val brokers="IP1:9092,IP2:9092,IP3:9092" //same as IPs above
>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>
>> val offsetRange= (0 to 2).map(part=>OffsetRange.create("ingress",part,0,100))
>> val messages= KafkaUtils.createRDD(sc,kafkaParams,offsetRange.toArray)
>> messages: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = KafkaRDD[1] at RDD 
>> at KafkaRDD.scala:45
>>
>> ​
>>
>> when I try messages.count I get:
>>
>> 15/09/22 14:01:17 ERROR TaskContextImpl: Error in TaskCompletionListener
>> java.lang.NullPointerException
>>  at 
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:157)
>>  at 
>> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
>>  at 
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
>>  at 
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
>>  at 
>> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:56)
>>  at 
>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:75)
>>  at 
>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73)
>>  at 
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>  at 
>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:73)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:72)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>


Re: Problems getting expected results from hbase_inputformat.py

2015-08-10 Thread Eric Bless
Thank you Gen, the changes to HBaseConverters.scala look to now be returning 
all column qualifiers, as follows - 

(u'row1', {u'qualifier': u'a', u'timestamp': u'1438716994027', u'value': 
u'value1', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row1'})
(u'row1', {u'qualifier': u'b', u'timestamp': u'1438717004248', u'value': 
u'value2', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row1'})
(u'row2', {u'qualifier': u'', u'timestamp': u'1438717014529', u'value': 
u'value3', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row2'})
(u'row3', {u'qualifier': u'', u'timestamp': u'1438717022756', u'value': 
u'value4', u'columnFamily': u'f1', u'type': u'Put', u'row': u'row3'})
Just to be clear, you refer to Spark update these two scripts recently.. What 
two scripts were you referencing? 


 On Friday, August 7, 2015 7:59 PM, gen tang gen.tan...@gmail.com wrote:
   

 Hi,
In fact, Pyspark use 
org.apache.spark.examples.pythonconverters(./examples/src/main/scala/org/apache/spark/pythonconverters/)
 to transform object of Hbase result to python string.Spark update these two 
scripts recently. However, they are not included in the official release of 
spark. So you are trying to use this new python script with old jar. 
You can clone the newest code of spark from github and build examples jar. Then 
you can get correct result.
CheersGen

On Sat, Aug 8, 2015 at 5:03 AM, Eric Bless eric.bl...@yahoo.com.invalid wrote:

I’m having some difficulty getting the desired results fromthe Spark Python 
example hbase_inputformat.py. I’m running with CDH5.4, hbaseVersion 1.0.0, 
Spark v 1.3.0 Using Python version 2.6.6 I followed the example to create a 
test HBase table. Here’sthe data from the table I created – hbase(main):001:0 
scan 'dev_wx_test'ROW  COLUMN+CELLrow1
column=f1:a, timestamp=1438716994027, value=value1row1
column=f1:b, timestamp=1438717004248, value=value2row2
column=f1:, timestamp=1438717014529, value=value3row3
column=f1:, timestamp=1438717022756, value=value43 row(s) in 0.2620 seconds 
When either of these statements are included -“hbase_rdd = 
hbase_rdd.flatMapValues(lambda v:v.split(\n))”  or “hbase_rdd = 
hbase_rdd.flatMapValues(lambda v:v.split(\n)).countByValue().items()” the 
result is - We only get the following printed; (row1, value2) is notprinted: 
    ((u'row1', u'value1'), 1)    ((u'row2', u'value3'), 1)    
((u'row3', u'value4'), 1) This looks like similar results to the following post 
Ifound 
-http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650but
 it appears the pythonconverterHBaseResultToStringConverter has been updated 
since then.
And this problem will be resolved too. 
 When the statement “hbase_rdd = hbase_rdd.flatMapValues(lambda 
v:v.split(\n)).mapValues(json.loads)” is included, the result is – 
ValueError: No JSON object could be decoded 
**
 Here is more info on this from the log – Traceback (most recent call last):  
Filehbase_inputformat.py, line 87, in module    output =hbase_rdd.collect() 
 
File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py,line
 701, in collect  
File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py,line
 538, in __call__  File 
/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py,line
 300, in get_return_valuepy4j.protocol.Py4JJavaError: An erroroccurred while 
calling o44.collect.: org.apache.spark.SparkException: Jobaborted due to stage 
failure: Task 0 in stage 1.0 failed 4 times, most recentfailure: Lost task 0.3 
in stage 1.0 (TID 4, 
stluhdpddev27.monsanto.com):org.apache.spark.api.python.PythonException: 
Traceback (most recent call last):  File 
/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py,line
 101, in main    process()  
File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py,line
 96, in process   serializer.dump_stream(func(split_index, iterator), outfile)  
File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py,line
 236, in dump_stream    vs =list(itertools.islice(iterator, batch))  
File/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py,line
 1807, in lambda  File/usr/lib64/python2.6/json/__init__.py, line 307, in 
loads    return_default_decoder.decode(s)  
File/usr/lib64/python2.6/json/decoder.py, line 319, in decode    

Re: Problems getting expected results from hbase_inputformat.py

2015-08-07 Thread gen tang
Hi,

In fact, Pyspark use
org.apache.spark.examples.pythonconverters(./examples/src/main/scala/org/apache/spark/pythonconverters/)
to transform object of Hbase result to python string.
Spark update these two scripts recently. However, they are not included in
the official release of spark. So you are trying to use this new python
script with old jar.

You can clone the newest code of spark from github and build examples jar.
Then you can get correct result.

Cheers
Gen


On Sat, Aug 8, 2015 at 5:03 AM, Eric Bless eric.bl...@yahoo.com.invalid
wrote:

 I’m having some difficulty getting the desired results from the Spark
 Python example hbase_inputformat.py. I’m running with CDH5.4, hbase Version
 1.0.0, Spark v 1.3.0 Using Python version 2.6.6

 I followed the example to create a test HBase table. Here’s the data from
 the table I created –
 hbase(main):001:0 scan 'dev_wx_test'
 ROW   COLUMN+CELL
 row1 column=f1:a, timestamp=1438716994027, value=value1
 row1 column=f1:b, timestamp=1438717004248, value=value2
 row2 column=f1:, timestamp=1438717014529, value=value3
 row3 column=f1:, timestamp=1438717022756, value=value4
 3 row(s) in 0.2620 seconds

 When either of these statements are included -
 “hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split(\n))”  or
 “hbase_rdd = hbase_rdd.flatMapValues(lambda v:
 v.split(\n)).countByValue().items()” the result is -
 We only get the following printed; (row1, value2) is not printed:
 ((u'row1', u'value1'), 1) ((u'row2', u'value3'), 1)
 ((u'row3', u'value4'), 1)
  This looks like similar results to the following post I found -

 http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650
 but it appears the pythonconverter HBaseResultToStringConverter has been
 updated since then.

And this problem will be resolved too.



 When the statement
 “hbase_rdd = hbase_rdd.flatMapValues(lambda v:
 v.split(\n)).mapValues(json.loads)” is included, the result is –
 ValueError: No JSON object could be decoded


 **
 Here is more info on this from the log –
 Traceback (most recent call last):
   File hbase_inputformat.py, line 87, in module
 output = hbase_rdd.collect()
   File
 /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py,
 line 701, in collect
   File
 /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py,
 line 538, in __call__
   File
 /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling o44.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage
 1.0 (TID 4, stluhdpddev27.monsanto.com):
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py,
 line 101, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py,
 line 96, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py,
 line 236, in dump_stream
 vs = list(itertools.islice(iterator, batch))
   File
 /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py,
 line 1807, in lambda
   File /usr/lib64/python2.6/json/__init__.py, line 307, in loads
 return _default_decoder.decode(s)
   File /usr/lib64/python2.6/json/decoder.py, line 319, in decode
 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
   File /usr/lib64/python2.6/json/decoder.py, line 338, in raw_decode
 raise ValueError(No JSON object could be decoded)
 ValueError: No JSON object could be decoded

 at
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
 at
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
 at
 org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 

Re: Not getting event logs = spark 1.3.1

2015-06-16 Thread Tsai Li Ming
Forgot to mention this is on standalone mode.

Is my configuration wrong?

Thanks,
Liming

On 15 Jun, 2015, at 11:26 pm, Tsai Li Ming mailingl...@ltsai.com wrote:

 Hi,
 
 I have this in my spark-defaults.conf (same for hdfs):
 spark.eventLog.enabled  true
 spark.eventLog.dir  file:/tmp/spark-events
 spark.history.fs.logDirectory   file:/tmp/spark-events
 
 While the app is running, there is a “.inprogress” directory. However when 
 the job completes, the directory is always empty.
 
 I’m submitting the job like this, using either the Pi or world count examples:
 $ bin/spark-submit 
 /opt/spark-1.4.0-bin-hadoop2.6/examples/src/main/python/wordcount.py 
 
 This used to be working in 1.2.1 and didn’t test 1.3.0.
 
 
 Regards,
 Liming
 
 
 
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem getting program to run on 15TB input

2015-06-09 Thread Arun Luthra
I found that the problem was due to garbage collection in filter(). Using
Hive to do the filter solved the problem.

A lot of other problems went away when I upgraded to Spark 1.2.0, which
compresses various task overhead data (HighlyCompressedMapStatus etc.).

It has been running very very smoothly with these two changes.

I'm fairly sure that I tried coalesce(), it resulted into tasks that were
too big, the code has evolved too much to easily double check it now.

On Sat, Jun 6, 2015 at 12:50 AM, Kapil Malik kma...@adobe.com wrote:

  Very interesting and relevant thread for production level usage of spark.



 @Arun, can you kindly confirm if Daniel’s suggestion helped your usecase?



 Thanks,



 Kapil Malik | kma...@adobe.com | 33430 / 8800836581



 *From:* Daniel Mahler [mailto:dmah...@gmail.com]
 *Sent:* 13 April 2015 15:42
 *To:* Arun Luthra
 *Cc:* Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org
 *Subject:* Re: Problem getting program to run on 15TB input



 Sometimes a large number of partitions leads to memory problems.

 Something like



 val rdd1 = sc.textFile(file1).coalesce(500). ...

 val rdd2 = sc.textFile(file2).coalesce(500). ...



 may help.





 On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Everything works smoothly if I do the 99%-removal filter in Hive first.
 So, all the baggage from garbage collection was breaking it.



 Is there a way to filter() out 99% of the data without having to garbage
 collect 99% of the RDD?



 On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote:

 I tried a shorter simper version of the program, with just 1 RDD,
  essentially it is:



 sc.textFile(..., N).map().filter().map( blah = (id,
 1L)).reduceByKey().saveAsTextFile(...)



 Here is a typical GC log trace from one of the yarn container logs:



 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)]
 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
 real=0.02 secs]

 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)]
 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
 real=0.04 secs]

 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)]
 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
 real=0.08 secs]

 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)]
 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
 real=0.02 secs]

 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)]
 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
 real=0.02 secs]

 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)]
 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
 real=0.02 secs]



 So ~9GB is getting GC'ed every few seconds. Which seems like a lot.



 Question: The filter() is removing 99% of the data. Does this 99% of the
 data get GC'ed?



 Now, I was able to finally get to reduceByKey() by reducing the number of
 executor-cores (to 2), based on suggestions at
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.



 I ran this with more executor-memory and less executors (most important
 thing was fewer executor-cores):



 --num-executors 150 \

 --driver-memory 15g \

 --executor-memory 110g \

 --executor-cores 32 \



 But then, reduceByKey() fails with:

 java.lang.OutOfMemoryError: Java heap space









 On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The Spark UI names the line number and name of the operation (repartition
 in this case) that it is performing. Only if this information is wrong
 (just a possibility), could it have started groupByKey already.



 I will try to analyze the amount of skew in the data by using reduceByKey
 (or simply countByKey) which is relatively inexpensive. For the purposes of
 this algorithm I can simply log and remove keys with huge counts, before
 doing groupByKey.



 On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 All stated symptoms are consistent with GC pressure (other nodes timeout
 trying to connect because of a long stop-the-world), quite possibly due to
 groupByKey. groupByKey is a very expensive operation as it may bring all
 the data for a particular partition into memory (in particular, it cannot
 spill values for a single key, so if you have a single very skewed key you
 can get behavior like this).



 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I understand
 how it works. How do you know that you haven't reached the groupbykey
 phase? Are you using a profiler or do yoi base that assumption only on logs?



 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:



 A correction to my first post

RE: Problem getting program to run on 15TB input

2015-06-06 Thread Kapil Malik
Very interesting and relevant thread for production level usage of spark.

@Arun, can you kindly confirm if Daniel’s suggestion helped your usecase?

Thanks,

Kapil Malik | kma...@adobe.commailto:kma...@adobe.com | 33430 / 8800836581

From: Daniel Mahler [mailto:dmah...@gmail.com]
Sent: 13 April 2015 15:42
To: Arun Luthra
Cc: Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org
Subject: Re: Problem getting program to run on 15TB input

Sometimes a large number of partitions leads to memory problems.
Something like

val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...

may help.


On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
Everything works smoothly if I do the 99%-removal filter in Hive first. So, all 
the baggage from garbage collection was breaking it.

Is there a way to filter() out 99% of the data without having to garbage 
collect 99% of the RDD?

On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
I tried a shorter simper version of the program, with just 1 RDD,  essentially 
it is:

sc.textFile(..., N).map().filter().map( blah = (id, 
1L)).reduceByKey().saveAsTextFile(...)

Here is a typical GC log trace from one of the yarn container logs:

54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)] 
9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, 
real=0.02 secs]
77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)] 
9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, 
real=0.04 secs]
79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)] 
9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, 
real=0.08 secs]
92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)] 
9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, 
real=0.02 secs]
114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)] 
9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, 
real=0.02 secs]
117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)] 
9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, 
real=0.02 secs]

So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

Question: The filter() is removing 99% of the data. Does this 99% of the data 
get GC'ed?

Now, I was able to finally get to reduceByKey() by reducing the number of 
executor-cores (to 2), based on suggestions at 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.

I ran this with more executor-memory and less executors (most important thing 
was fewer executor-cores):

--num-executors 150 \
--driver-memory 15g \
--executor-memory 110g \
--executor-cores 32 \

But then, reduceByKey() fails with:

java.lang.OutOfMemoryError: Java heap space




On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
The Spark UI names the line number and name of the operation (repartition in 
this case) that it is performing. Only if this information is wrong (just a 
possibility), could it have started groupByKey already.

I will try to analyze the amount of skew in the data by using reduceByKey (or 
simply countByKey) which is relatively inexpensive. For the purposes of this 
algorithm I can simply log and remove keys with huge counts, before doing 
groupByKey.

On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson 
ilike...@gmail.commailto:ilike...@gmail.com wrote:
All stated symptoms are consistent with GC pressure (other nodes timeout trying 
to connect because of a long stop-the-world), quite possibly due to groupByKey. 
groupByKey is a very expensive operation as it may bring all the data for a 
particular partition into memory (in particular, it cannot spill values for a 
single key, so if you have a single very skewed key you can get behavior like 
this).

On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc 
paul.sz...@gmail.commailto:paul.sz...@gmail.com wrote:

But groupbykey will repartition according to numer of keys as I understand how 
it works. How do you know that you haven't reached the groupbykey phase? Are 
you using a profiler or do yoi base that assumption only on logs?

sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
arun.lut...@gmail.commailto:arun.lut...@gmail.com napisał:

A correction to my first post:

There is also a repartition right before groupByKey to help avoid 
too-many-open-files error:

rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
arun.lut...@gmail.commailto:arun.lut...@gmail.com wrote:
The job fails before getting to groupByKey.

I see a lot of timeout errors in the yarn logs, like:

15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1

Re: not getting any mail

2015-05-02 Thread Ted Yu
Looks like there were delays across Apache project mailing lists. 

Emails are coming through now. 



 On May 2, 2015, at 9:14 AM, Jeetendra Gangele gangele...@gmail.com wrote:
 
 Hi All
 
 I am not getting any mail from this community?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem getting program to run on 15TB input

2015-04-13 Thread Daniel Mahler
Sometimes a large number of partitions leads to memory problems.
Something like

val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...

may help.


On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Everything works smoothly if I do the 99%-removal filter in Hive first.
 So, all the baggage from garbage collection was breaking it.

 Is there a way to filter() out 99% of the data without having to garbage
 collect 99% of the RDD?

 On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote:

 I tried a shorter simper version of the program, with just 1 RDD,
  essentially it is:

 sc.textFile(..., N).map().filter().map( blah = (id,
 1L)).reduceByKey().saveAsTextFile(...)

 Here is a typical GC log trace from one of the yarn container logs:

 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)]
 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
 real=0.02 secs]
 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)]
 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
 real=0.04 secs]
 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)]
 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
 real=0.08 secs]
 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)]
 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
 real=0.02 secs]
 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)]
 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
 real=0.02 secs]
 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)]
 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
 real=0.02 secs]

 So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

 Question: The filter() is removing 99% of the data. Does this 99% of the
 data get GC'ed?

 Now, I was able to finally get to reduceByKey() by reducing the number of
 executor-cores (to 2), based on suggestions at
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.

 I ran this with more executor-memory and less executors (most important
 thing was fewer executor-cores):

 --num-executors 150 \
 --driver-memory 15g \
 --executor-memory 110g \
 --executor-cores 32 \

 But then, reduceByKey() fails with:

 java.lang.OutOfMemoryError: Java heap space




 On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The Spark UI names the line number and name of the operation
 (repartition in this case) that it is performing. Only if this information
 is wrong (just a possibility), could it have started groupByKey already.

 I will try to analyze the amount of skew in the data by using
 reduceByKey (or simply countByKey) which is relatively inexpensive. For the
 purposes of this algorithm I can simply log and remove keys with huge
 counts, before doing groupByKey.

 On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 All stated symptoms are consistent with GC pressure (other nodes
 timeout trying to connect because of a long stop-the-world), quite possibly
 due to groupByKey. groupByKey is a very expensive operation as it may bring
 all the data for a particular partition into memory (in particular, it
 cannot spill values for a single key, so if you have a single very skewed
 key you can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I
 understand how it works. How do you know that you haven't reached the
 groupbykey phase? Are you using a profiler or do yoi base that assumption
 only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend:
 Driver Disassociated [akka.tcp://sparkExecutor@...] -
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task
 421027.0 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 

Re: Problem getting program to run on 15TB input

2015-03-02 Thread Arun Luthra
Everything works smoothly if I do the 99%-removal filter in Hive first. So,
all the baggage from garbage collection was breaking it.

Is there a way to filter() out 99% of the data without having to garbage
collect 99% of the RDD?

On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote:

 I tried a shorter simper version of the program, with just 1 RDD,
  essentially it is:

 sc.textFile(..., N).map().filter().map( blah = (id,
 1L)).reduceByKey().saveAsTextFile(...)

 Here is a typical GC log trace from one of the yarn container logs:

 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)]
 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
 real=0.02 secs]
 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)]
 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
 real=0.04 secs]
 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)]
 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
 real=0.08 secs]
 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)]
 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
 real=0.02 secs]
 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)]
 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
 real=0.02 secs]
 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)]
 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
 real=0.02 secs]

 So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

 Question: The filter() is removing 99% of the data. Does this 99% of the
 data get GC'ed?

 Now, I was able to finally get to reduceByKey() by reducing the number of
 executor-cores (to 2), based on suggestions at
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
 . This makes everything before reduceByKey() run pretty smoothly.

 I ran this with more executor-memory and less executors (most important
 thing was fewer executor-cores):

 --num-executors 150 \
 --driver-memory 15g \
 --executor-memory 110g \
 --executor-cores 32 \

 But then, reduceByKey() fails with:

 java.lang.OutOfMemoryError: Java heap space




 On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The Spark UI names the line number and name of the operation (repartition
 in this case) that it is performing. Only if this information is wrong
 (just a possibility), could it have started groupByKey already.

 I will try to analyze the amount of skew in the data by using reduceByKey
 (or simply countByKey) which is relatively inexpensive. For the purposes of
 this algorithm I can simply log and remove keys with huge counts, before
 doing groupByKey.

 On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 All stated symptoms are consistent with GC pressure (other nodes timeout
 trying to connect because of a long stop-the-world), quite possibly due to
 groupByKey. groupByKey is a very expensive operation as it may bring all
 the data for a particular partition into memory (in particular, it cannot
 spill values for a single key, so if you have a single very skewed key you
 can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I
 understand how it works. How do you know that you haven't reached the
 groupbykey phase? Are you using a profiler or do yoi base that assumption
 only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] -
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after
 doing groupbykey one of the groups does not fit 

Re: Problem getting program to run on 15TB input

2015-03-01 Thread Arun Luthra
I tried a shorter simper version of the program, with just 1 RDD,
 essentially it is:

sc.textFile(..., N).map().filter().map( blah = (id,
1L)).reduceByKey().saveAsTextFile(...)

Here is a typical GC log trace from one of the yarn container logs:

54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)]
9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
real=0.02 secs]
77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)]
9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
real=0.04 secs]
79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)]
9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
real=0.08 secs]
92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)]
9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
real=0.02 secs]
114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)]
9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
real=0.02 secs]
117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)]
9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
real=0.02 secs]

So ~9GB is getting GC'ed every few seconds. Which seems like a lot.

Question: The filter() is removing 99% of the data. Does this 99% of the
data get GC'ed?

Now, I was able to finally get to reduceByKey() by reducing the number of
executor-cores (to 2), based on suggestions at
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
. This makes everything before reduceByKey() run pretty smoothly.

I ran this with more executor-memory and less executors (most important
thing was fewer executor-cores):

--num-executors 150 \
--driver-memory 15g \
--executor-memory 110g \
--executor-cores 32 \

But then, reduceByKey() fails with:

java.lang.OutOfMemoryError: Java heap space




On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com wrote:

 The Spark UI names the line number and name of the operation (repartition
 in this case) that it is performing. Only if this information is wrong
 (just a possibility), could it have started groupByKey already.

 I will try to analyze the amount of skew in the data by using reduceByKey
 (or simply countByKey) which is relatively inexpensive. For the purposes of
 this algorithm I can simply log and remove keys with huge counts, before
 doing groupByKey.

 On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 All stated symptoms are consistent with GC pressure (other nodes timeout
 trying to connect because of a long stop-the-world), quite possibly due to
 groupByKey. groupByKey is a very expensive operation as it may bring all
 the data for a particular partition into memory (in particular, it cannot
 spill values for a single key, so if you have a single very skewed key you
 can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I
 understand how it works. How do you know that you haven't reached the
 groupbykey phase? Are you using a profiler or do yoi base that assumption
 only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] -
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after
 doing groupbykey one of the groups does not fit in one of the executors'
 memory.

 To back up my theory, instead of doing groupbykey + map try
 reducebykey + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 So, actually I am removing the persist for now, because there is
 

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
A correction to my first post:

There is also a repartition right before groupByKey to help avoid
too-many-open-files error:

rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
 disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
 stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey +
 mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = sc.textFile(file1).persist(StorageLevel
 .MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything
 before groupByKey(). It fails before getting to groupByKey. I have 

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Aaron Davidson
All stated symptoms are consistent with GC pressure (other nodes timeout
trying to connect because of a long stop-the-world), quite possibly due to
groupByKey. groupByKey is a very expensive operation as it may bring all
the data for a particular partition into memory (in particular, it cannot
spill values for a single key, so if you have a single very skewed key you
can get behavior like this).

On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote:

 But groupbykey will repartition according to numer of keys as I understand
 how it works. How do you know that you haven't reached the groupbykey
 phase? Are you using a profiler or do yoi base that assumption only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
 disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
 stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey
 + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
I would first check whether  there is any possibility that after doing
groupbykey one of the groups does not fit in one of the executors' memory.

To back up my theory, instead of doing groupbykey + map try reducebykey +
mapvalues.

Let me know if that helped.

Pawel Szulc
http://rabbitonweb.com

sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors, executor
 memory, and more importantly, number of tasks... *all three times it
 failed when approximately 75.1% of the tasks were completed (no matter how
 many tasks resulted from repartitioning the data in textfile(..., N))*.
 Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
 try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for too
 many files open
   .set(spark.mesos.coarse, true) // helpful for MapOutputTracker
 errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to error:
 Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
 spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything before
 groupByKey(). It fails before getting to groupByKey. I have tried doubling
 and tripling the number of partitions when calling textFile, with no
 success.

 Very similar code (trivial changes, to accomodate different input)
 worked on a smaller input (~8TB)... Not that it was easy to get that
 working.



 Errors vary, here is what I am getting right now:

 ERROR SendingConnection: Exception while reading SendingConnection
 ... java.nio.channels.ClosedChannelException
 (^ guessing that is symptom of something else)

 WARN BlockManagerMasterActor: Removing BlockManager
 BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
 (^ guessing that is symptom of something else)

 ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
 down ActorSystem [sparkDriver]
 *java.lang.OutOfMemoryError: GC overhead limit exceeded*



 Other times I will get messages about executor lost... about 1 message
 per second, after ~~50k tasks complete, until there are almost no executors
 left and progress slows to nothing.

 I ran with verbose GC info; I do see failing yarn containers that have
 multiple (like 30) Full GC messages but I don't know how to interpret if
 that is the problem. Typical Full GC time taken seems ok: [Times:
 user=23.30 sys=0.06, real=1.94 secs]




Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
But groupbykey will repartition according to numer of keys as I understand
how it works. How do you know that you haven't reached the groupbykey
phase? Are you using a profiler or do yoi base that assumption only on logs?

sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
 disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
 stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey
 + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = sc.textFile(file1).persist(StorageLevel
 .MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
So, actually I am removing the persist for now, because there is
significant filtering that happens after calling textFile()... but I will
keep that option in mind.

I just tried a few different combinations of number of executors, executor
memory, and more importantly, number of tasks... *all three times it failed
when approximately 75.1% of the tasks were completed (no matter how many
tasks resulted from repartitioning the data in textfile(..., N))*. Surely
this is a strong clue to something?



On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates
 many small objects that lead to very long GC time, causing the executor
 losts, heartbeat not received, and GC overhead limit exceeded messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
 try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for too
 many files open
   .set(spark.mesos.coarse, true) // helpful for MapOutputTracker
 errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to error:
 Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
 spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything before
 groupByKey(). It fails before getting to groupByKey. I have tried doubling
 and tripling the number of partitions when calling textFile, with no
 success.

 Very similar code (trivial changes, to accomodate different input) worked
 on a smaller input (~8TB)... Not that it was easy to get that working.



 Errors vary, here is what I am getting right now:

 ERROR SendingConnection: Exception while reading SendingConnection
 ... java.nio.channels.ClosedChannelException
 (^ guessing that is symptom of something else)

 WARN BlockManagerMasterActor: Removing BlockManager
 BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
 (^ guessing that is symptom of something else)

 ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
 down ActorSystem [sparkDriver]
 *java.lang.OutOfMemoryError: GC overhead limit exceeded*



 Other times I will get messages about executor lost... about 1 message
 per second, after ~~50k tasks complete, until there are almost no executors
 left and progress slows to nothing.

 I ran with verbose GC info; I do see failing yarn containers that have
 multiple (like 30) Full GC messages but I don't know how to interpret if
 that is the problem. Typical Full GC time taken seems ok: [Times:
 user=23.30 sys=0.06, real=1.94 secs]



 Suggestions, please?

 Huge thanks for useful suggestions,
 Arun





Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The job fails before getting to groupByKey.

I see a lot of timeout errors in the yarn logs, like:

15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
akka.pattern.AskTimeoutException: Timed out

and

15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

and some of these are followed by:

15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
disassociated! Shutting down.
15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
stage 1.0 (TID 336601)
java.io.FileNotFoundException:
/hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
(No such file or directory)




On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey +
 mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for too
 many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything
 before groupByKey(). It fails before getting to groupByKey. I have tried
 doubling and tripling the number of partitions when calling textFile, with
 no success.

 Very similar code (trivial changes, to accomodate different input)
 worked on a smaller input (~8TB)... Not that it was easy to get that
 working.



 Errors vary, here is what I am getting right now:

 ERROR SendingConnection: 

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The Spark UI names the line number and name of the operation (repartition
in this case) that it is performing. Only if this information is wrong
(just a possibility), could it have started groupByKey already.

I will try to analyze the amount of skew in the data by using reduceByKey
(or simply countByKey) which is relatively inexpensive. For the purposes of
this algorithm I can simply log and remove keys with huge counts, before
doing groupByKey.

On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com wrote:

 All stated symptoms are consistent with GC pressure (other nodes timeout
 trying to connect because of a long stop-the-world), quite possibly due to
 groupByKey. groupByKey is a very expensive operation as it may bring all
 the data for a particular partition into memory (in particular, it cannot
 spill values for a single key, so if you have a single very skewed key you
 can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I
 understand how it works. How do you know that you haven't reached the
 groupbykey phase? Are you using a profiler or do yoi base that assumption
 only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] -
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try
 reducebykey + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com
 wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
  wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
  

Re: Problem getting program to run on 15TB input

2015-02-27 Thread Burak Yavuz
Hi,

Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates
many small objects that lead to very long GC time, causing the executor
losts, heartbeat not received, and GC overhead limit exceeded messages.
Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
try `OFF_HEAP` (and use Tachyon).

Burak

On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for too
 many files open
   .set(spark.mesos.coarse, true) // helpful for MapOutputTracker
 errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to error:
 Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
 spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast variables.

 Spark creates 145k tasks, and the first stage includes everything before
 groupByKey(). It fails before getting to groupByKey. I have tried doubling
 and tripling the number of partitions when calling textFile, with no
 success.

 Very similar code (trivial changes, to accomodate different input) worked
 on a smaller input (~8TB)... Not that it was easy to get that working.



 Errors vary, here is what I am getting right now:

 ERROR SendingConnection: Exception while reading SendingConnection
 ... java.nio.channels.ClosedChannelException
 (^ guessing that is symptom of something else)

 WARN BlockManagerMasterActor: Removing BlockManager
 BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
 (^ guessing that is symptom of something else)

 ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
 down ActorSystem [sparkDriver]
 *java.lang.OutOfMemoryError: GC overhead limit exceeded*



 Other times I will get messages about executor lost... about 1 message
 per second, after ~~50k tasks complete, until there are almost no executors
 left and progress slows to nothing.

 I ran with verbose GC info; I do see failing yarn containers that have
 multiple (like 30) Full GC messages but I don't know how to interpret if
 that is the problem. Typical Full GC time taken seems ok: [Times:
 user=23.30 sys=0.06, real=1.94 secs]



 Suggestions, please?

 Huge thanks for useful suggestions,
 Arun



Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
Yes, am sure the system cant find the jar.. but how do I fix that... my
submit command includes the jar:

/spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
/spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
/spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py

and the spark output seems to indicate it is handling it:

15/02/16 05:58:46 INFO SparkContext: Added JAR
file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
timestamp 1424066326632


I don't really know what else I could try any suggestions highly
appreciated.

Thanks,
Mohamed.


On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com wrote:

 It seems that the jar for cassandra is not loaded, you should have
 them in the classpath.

 On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  Hello all,
 
  Trying the example code from this package
  (https://github.com/Parsely/pyspark-cassandra) , I always get this
 error...
 
  Can you see what I am doing wrong? from googling arounf it seems to be
 that
  the jar is not found somehow...  The spark log shows the JAR was
 processed
  at least.
 
  Thank you so much.
 
  am using spark-1.2.1-bin-hadoop2.4.tgz
 
  test2.py is simply:
 
  from pyspark.context import SparkConf
  from pyspark_cassandra import CassandraSparkContext, saveToCassandra
  conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
  conf.set(spark.cassandra.connection.host, devzero)
  sc = CassandraSparkContext(conf=conf)
 
  [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash -c
  /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
  ...
  15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
  15/02/16 05:58:45 INFO Remoting: Starting remoting
  15/02/16 05:58:45 INFO Remoting: Remoting started; listening on addresses
  :[akka.tcp://sparkDriver@devzero:38917]
  15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver'
 on
  port 38917.
  15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
  15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
  15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
 
 /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
  15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity
 265.4
  MB
  15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
  library for your platform... using builtin-java classes where applicable
  15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
 
 /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
  15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
  15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
  server' on port 56642.
  15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on
 port
  4040.
  15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040
  15/02/16 05:58:46 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424066326632
  15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
 
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
  15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at
  http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
  15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
 
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
  15/02/16 05:58:46 INFO SparkContext: Added file
  file:/spark/pyspark_cassandra.py at
  http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
  1424066326642
  15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host
  localhost
  15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
  akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
  15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895
  15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register
 BlockManager
  15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block manager
  localhost:32895 with 265.4 MB RAM, BlockManagerId(driver, localhost,
  32895)
  15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager
  15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at
 http://devzero:4040
  15/02/16 05:58:47 INFO DAGScheduler: Stopping DAGScheduler
  15/02/16 05:58:48 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor
  stopped!
  15/02/16 05:58:48 INFO MemoryStore: MemoryStore cleared
  15/02/16 05:58:48 INFO BlockManager: BlockManager stopped
  

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Davies Liu
It also need the Cassandra jar: com.datastax.spark.connector.CassandraJavaUtil

Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?



On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
mohamed.lrh...@georgetown.edu wrote:
 Yes, am sure the system cant find the jar.. but how do I fix that... my
 submit command includes the jar:

 /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
 /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
 /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py

 and the spark output seems to indicate it is handling it:

 15/02/16 05:58:46 INFO SparkContext: Added JAR
 file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
 http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
 timestamp 1424066326632


 I don't really know what else I could try any suggestions highly
 appreciated.

 Thanks,
 Mohamed.


 On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com wrote:

 It seems that the jar for cassandra is not loaded, you should have
 them in the classpath.

 On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  Hello all,
 
  Trying the example code from this package
  (https://github.com/Parsely/pyspark-cassandra) , I always get this
  error...
 
  Can you see what I am doing wrong? from googling arounf it seems to be
  that
  the jar is not found somehow...  The spark log shows the JAR was
  processed
  at least.
 
  Thank you so much.
 
  am using spark-1.2.1-bin-hadoop2.4.tgz
 
  test2.py is simply:
 
  from pyspark.context import SparkConf
  from pyspark_cassandra import CassandraSparkContext, saveToCassandra
  conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
  conf.set(spark.cassandra.connection.host, devzero)
  sc = CassandraSparkContext(conf=conf)
 
  [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash -c
  /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
  ...
  15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
  15/02/16 05:58:45 INFO Remoting: Starting remoting
  15/02/16 05:58:45 INFO Remoting: Remoting started; listening on
  addresses
  :[akka.tcp://sparkDriver@devzero:38917]
  15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver'
  on
  port 38917.
  15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
  15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
  15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
 
  /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
  15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity
  265.4
  MB
  15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
  library for your platform... using builtin-java classes where applicable
  15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
 
  /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
  15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
  15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
  server' on port 56642.
  15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on
  port
  4040.
  15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040
  15/02/16 05:58:46 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424066326632
  15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
 
  /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
  15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at
  http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
  15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
 
  /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
  15/02/16 05:58:46 INFO SparkContext: Added file
  file:/spark/pyspark_cassandra.py at
  http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
  1424066326642
  15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host
  localhost
  15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
  akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
  15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on
  32895
  15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register
  BlockManager
  15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block
  manager
  localhost:32895 with 265.4 MB RAM, BlockManagerId(driver, localhost,
  32895)
  15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager
  15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at
  http://devzero:4040
  15/02/16 

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
Oh, I don't know. thanks a lot Davies, gonna figure that out now

On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com wrote:

 It also need the Cassandra jar:
 com.datastax.spark.connector.CassandraJavaUtil

 Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?



 On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  Yes, am sure the system cant find the jar.. but how do I fix that... my
  submit command includes the jar:
 
  /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
 
  and the spark output seems to indicate it is handling it:
 
  15/02/16 05:58:46 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424066326632
 
 
  I don't really know what else I could try any suggestions highly
  appreciated.
 
  Thanks,
  Mohamed.
 
 
  On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com
 wrote:
 
  It seems that the jar for cassandra is not loaded, you should have
  them in the classpath.
 
  On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
  mohamed.lrh...@georgetown.edu wrote:
   Hello all,
  
   Trying the example code from this package
   (https://github.com/Parsely/pyspark-cassandra) , I always get this
   error...
  
   Can you see what I am doing wrong? from googling arounf it seems to be
   that
   the jar is not found somehow...  The spark log shows the JAR was
   processed
   at least.
  
   Thank you so much.
  
   am using spark-1.2.1-bin-hadoop2.4.tgz
  
   test2.py is simply:
  
   from pyspark.context import SparkConf
   from pyspark_cassandra import CassandraSparkContext, saveToCassandra
   conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
   conf.set(spark.cassandra.connection.host, devzero)
   sc = CassandraSparkContext(conf=conf)
  
   [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash
 -c
   /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
   ...
   15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
   15/02/16 05:58:45 INFO Remoting: Starting remoting
   15/02/16 05:58:45 INFO Remoting: Remoting started; listening on
   addresses
   :[akka.tcp://sparkDriver@devzero:38917]
   15/02/16 05:58:45 INFO Utils: Successfully started service
 'sparkDriver'
   on
   port 38917.
   15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
   15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
   15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
  
  
 /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
   15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity
   265.4
   MB
   15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
   library for your platform... using builtin-java classes where
 applicable
   15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
  
  
 /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
   15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
   15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
   server' on port 56642.
   15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI'
 on
   port
   4040.
   15/02/16 05:58:46 INFO SparkUI: Started SparkUI at
 http://devzero:4040
   15/02/16 05:58:46 INFO SparkContext: Added JAR
   file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
   http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar
 with
   timestamp 1424066326632
   15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
  
  
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
   15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py
 at
   http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
   15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
  
  
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
   15/02/16 05:58:46 INFO SparkContext: Added file
   file:/spark/pyspark_cassandra.py at
   http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
   1424066326642
   15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host
   localhost
   15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
   akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
   15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on
   32895
   15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register
   BlockManager
   15/02/16 05:58:46 INFO 

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Davies Liu
It seems that the jar for cassandra is not loaded, you should have
them in the classpath.

On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
mohamed.lrh...@georgetown.edu wrote:
 Hello all,

 Trying the example code from this package
 (https://github.com/Parsely/pyspark-cassandra) , I always get this error...

 Can you see what I am doing wrong? from googling arounf it seems to be that
 the jar is not found somehow...  The spark log shows the JAR was processed
 at least.

 Thank you so much.

 am using spark-1.2.1-bin-hadoop2.4.tgz

 test2.py is simply:

 from pyspark.context import SparkConf
 from pyspark_cassandra import CassandraSparkContext, saveToCassandra
 conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
 conf.set(spark.cassandra.connection.host, devzero)
 sc = CassandraSparkContext(conf=conf)

 [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash -c
 /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
 /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
 /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
 ...
 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
 15/02/16 05:58:45 INFO Remoting: Starting remoting
 15/02/16 05:58:45 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@devzero:38917]
 15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver' on
 port 38917.
 15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
 15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
 15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
 /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
 15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity 265.4
 MB
 15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
 15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
 15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
 server' on port 56642.
 15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on port
 4040.
 15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040
 15/02/16 05:58:46 INFO SparkContext: Added JAR
 file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
 http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
 timestamp 1424066326632
 15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at
 http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
 15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
 15/02/16 05:58:46 INFO SparkContext: Added file
 file:/spark/pyspark_cassandra.py at
 http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
 1424066326642
 15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host
 localhost
 15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
 akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
 15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895
 15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register BlockManager
 15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block manager
 localhost:32895 with 265.4 MB RAM, BlockManagerId(driver, localhost,
 32895)
 15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager
 15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at http://devzero:4040
 15/02/16 05:58:47 INFO DAGScheduler: Stopping DAGScheduler
 15/02/16 05:58:48 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor
 stopped!
 15/02/16 05:58:48 INFO MemoryStore: MemoryStore cleared
 15/02/16 05:58:48 INFO BlockManager: BlockManager stopped
 15/02/16 05:58:48 INFO BlockManagerMaster: BlockManagerMaster stopped
 15/02/16 05:58:48 INFO SparkContext: Successfully stopped SparkContext
 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
 down remote daemon.
 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Remote
 daemon shut down; proceeding with flushing remote transports.
 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
 shut down.
 Traceback (most recent call last):
   File /spark/test2.py, line 5, in module
 sc = CassandraSparkContext(conf=conf)
   File /spark/python/pyspark/context.py, line 105, in __init__
 conf, jsc)
   File /spark/pyspark_cassandra.py, line 17, in _do_init
 self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
   File 

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
Will do. Thanks a lot.


On Mon, Feb 16, 2015 at 7:20 PM, Davies Liu dav...@databricks.com wrote:

 Can you try the example in pyspark-cassandra?

 If not, you could create a issue there.

 On Mon, Feb 16, 2015 at 4:07 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  So I tired building the connector from:
  https://github.com/datastax/spark-cassandra-connector
 
  which seems to include the java class referenced in the error message:
 
  [root@devzero spark]# unzip -l
 
 spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
  |grep CassandraJavaUtil
 
  14612  02-16-2015 23:25
  com/datastax/spark/connector/japi/CassandraJavaUtil.class
 
  [root@devzero spark]#
 
 
  When I try running my spark test job, I still get the exact same error,
 even
  though both my jars seems to have been processed by spark.
 
 
  ...
  15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040
  15/02/17 00:00:45 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424131245595
  15/02/17 00:00:45 INFO SparkContext: Added JAR
  file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at
 
 http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
  with timestamp 1424131245623
  15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to
 
 /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py
  15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at
  http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624
  15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to
 
 /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py
  15/02/17 00:00:45 INFO SparkContext: Added file
  file:/spark/pyspark_cassandra.py at
  http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp
  1424131245633
  15/02/17 00:00:45 INFO Executor: Starting executor ID driver on host
  localhost
  15/
  
  15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator:
 Remoting
  shut down.
  Traceback (most recent call last):
File /spark/test2.py, line 5, in module
  sc = CassandraSparkContext(conf=conf)
File /spark/python/pyspark/context.py, line 105, in __init__
  conf, jsc)
File /spark/pyspark_cassandra.py, line 17, in _do_init
  self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line
  726, in __getattr__
  py4j.protocol.Py4JError: Trying to call a package.
 
 
  am I building the wrong connector jar? or using the wrong jar?
 
  Thanks a lot,
  Mohamed.
 
 
 
  On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi
  mohamed.lrh...@georgetown.edu wrote:
 
  Oh, I don't know. thanks a lot Davies, gonna figure that out now
 
  On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com
 wrote:
 
  It also need the Cassandra jar:
  com.datastax.spark.connector.CassandraJavaUtil
 
  Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?
 
 
 
  On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
  mohamed.lrh...@georgetown.edu wrote:
   Yes, am sure the system cant find the jar.. but how do I fix that...
 my
   submit command includes the jar:
  
   /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
  
   and the spark output seems to indicate it is handling it:
  
   15/02/16 05:58:46 INFO SparkContext: Added JAR
   file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
   http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar
 with
   timestamp 1424066326632
  
  
   I don't really know what else I could try any suggestions highly
   appreciated.
  
   Thanks,
   Mohamed.
  
  
   On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com
   wrote:
  
   It seems that the jar for cassandra is not loaded, you should have
   them in the classpath.
  
   On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
   mohamed.lrh...@georgetown.edu wrote:
Hello all,
   
Trying the example code from this package
(https://github.com/Parsely/pyspark-cassandra) , I always get
 this
error...
   
Can you see what I am doing wrong? from googling arounf it seems
 to
be
that
the jar is not found somehow...  The spark log shows the JAR was
processed
at least.
   
Thank you so much.
   
am using spark-1.2.1-bin-hadoop2.4.tgz
   
test2.py is simply:
   
from pyspark.context import SparkConf
from pyspark_cassandra import CassandraSparkContext,
 saveToCassandra
conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
So I tired building the connector from:
https://github.com/datastax/spark-cassandra-connector

which seems to include the java class referenced in the error message:

[root@devzero spark]# unzip -l
spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
|grep CassandraJavaUtil

14612  02-16-2015 23:25
com/datastax/spark/connector/japi/CassandraJavaUtil.class

[root@devzero spark]#


When I try running my spark test job, I still get the exact same error,
even though both my jars seems to have been processed by spark.


...
15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040
15/02/17 00:00:45 INFO SparkContext: Added JAR
file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
timestamp 1424131245595
15/02/17 00:00:45 INFO SparkContext: Added JAR
file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at
http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
with timestamp 1424131245623
15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to
/tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py
15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at
http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624
15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to
/tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py
15/02/17 00:00:45 INFO SparkContext: Added file
file:/spark/pyspark_cassandra.py at
http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp
1424131245633
15/02/17 00:00:45 INFO Executor: Starting executor ID driver on host
localhost
15/

15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.
Traceback (most recent call last):
  File /spark/test2.py, line 5, in module
sc = CassandraSparkContext(conf=conf)
  File /spark/python/pyspark/context.py, line 105, in __init__
conf, jsc)
  File /spark/pyspark_cassandra.py, line 17, in _do_init
self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
  File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line
726, in __getattr__
py4j.protocol.Py4JError: Trying to call a package.


am I building the wrong connector jar? or using the wrong jar?

Thanks a lot,
Mohamed.



On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi 
mohamed.lrh...@georgetown.edu wrote:

 Oh, I don't know. thanks a lot Davies, gonna figure that out now

 On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com wrote:

 It also need the Cassandra jar:
 com.datastax.spark.connector.CassandraJavaUtil

 Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?



 On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  Yes, am sure the system cant find the jar.. but how do I fix that... my
  submit command includes the jar:
 
  /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
 
  and the spark output seems to indicate it is handling it:
 
  15/02/16 05:58:46 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424066326632
 
 
  I don't really know what else I could try any suggestions highly
  appreciated.
 
  Thanks,
  Mohamed.
 
 
  On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com
 wrote:
 
  It seems that the jar for cassandra is not loaded, you should have
  them in the classpath.
 
  On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
  mohamed.lrh...@georgetown.edu wrote:
   Hello all,
  
   Trying the example code from this package
   (https://github.com/Parsely/pyspark-cassandra) , I always get this
   error...
  
   Can you see what I am doing wrong? from googling arounf it seems to
 be
   that
   the jar is not found somehow...  The spark log shows the JAR was
   processed
   at least.
  
   Thank you so much.
  
   am using spark-1.2.1-bin-hadoop2.4.tgz
  
   test2.py is simply:
  
   from pyspark.context import SparkConf
   from pyspark_cassandra import CassandraSparkContext, saveToCassandra
   conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
   conf.set(spark.cassandra.connection.host, devzero)
   sc = CassandraSparkContext(conf=conf)
  
   [root@devzero spark]# /usr/local/bin/docker-enter  spark-master
 bash -c
   /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py
 --jars
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
   ...
   15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
   15/02/16 05:58:45 INFO Remoting: 

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Davies Liu
Can you try the example in pyspark-cassandra?

If not, you could create a issue there.

On Mon, Feb 16, 2015 at 4:07 PM, Mohamed Lrhazi
mohamed.lrh...@georgetown.edu wrote:
 So I tired building the connector from:
 https://github.com/datastax/spark-cassandra-connector

 which seems to include the java class referenced in the error message:

 [root@devzero spark]# unzip -l
 spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
 |grep CassandraJavaUtil

 14612  02-16-2015 23:25
 com/datastax/spark/connector/japi/CassandraJavaUtil.class

 [root@devzero spark]#


 When I try running my spark test job, I still get the exact same error, even
 though both my jars seems to have been processed by spark.


 ...
 15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040
 15/02/17 00:00:45 INFO SparkContext: Added JAR
 file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
 http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
 timestamp 1424131245595
 15/02/17 00:00:45 INFO SparkContext: Added JAR
 file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at
 http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
 with timestamp 1424131245623
 15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to
 /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py
 15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at
 http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624
 15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to
 /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py
 15/02/17 00:00:45 INFO SparkContext: Added file
 file:/spark/pyspark_cassandra.py at
 http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp
 1424131245633
 15/02/17 00:00:45 INFO Executor: Starting executor ID driver on host
 localhost
 15/
 
 15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
 shut down.
 Traceback (most recent call last):
   File /spark/test2.py, line 5, in module
 sc = CassandraSparkContext(conf=conf)
   File /spark/python/pyspark/context.py, line 105, in __init__
 conf, jsc)
   File /spark/pyspark_cassandra.py, line 17, in _do_init
 self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
   File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line
 726, in __getattr__
 py4j.protocol.Py4JError: Trying to call a package.


 am I building the wrong connector jar? or using the wrong jar?

 Thanks a lot,
 Mohamed.



 On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:

 Oh, I don't know. thanks a lot Davies, gonna figure that out now

 On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com wrote:

 It also need the Cassandra jar:
 com.datastax.spark.connector.CassandraJavaUtil

 Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?



 On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  Yes, am sure the system cant find the jar.. but how do I fix that... my
  submit command includes the jar:
 
  /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
 
  and the spark output seems to indicate it is handling it:
 
  15/02/16 05:58:46 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424066326632
 
 
  I don't really know what else I could try any suggestions highly
  appreciated.
 
  Thanks,
  Mohamed.
 
 
  On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com
  wrote:
 
  It seems that the jar for cassandra is not loaded, you should have
  them in the classpath.
 
  On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
  mohamed.lrh...@georgetown.edu wrote:
   Hello all,
  
   Trying the example code from this package
   (https://github.com/Parsely/pyspark-cassandra) , I always get this
   error...
  
   Can you see what I am doing wrong? from googling arounf it seems to
   be
   that
   the jar is not found somehow...  The spark log shows the JAR was
   processed
   at least.
  
   Thank you so much.
  
   am using spark-1.2.1-bin-hadoop2.4.tgz
  
   test2.py is simply:
  
   from pyspark.context import SparkConf
   from pyspark_cassandra import CassandraSparkContext, saveToCassandra
   conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
   conf.set(spark.cassandra.connection.host, devzero)
   sc = CassandraSparkContext(conf=conf)
  
   [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash
   -c
   /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py
   --jars
   

Re: Streaming: getting data from Cassandra based on input stream values

2015-01-23 Thread madhu phatak
Hi,
In that case, you can try the following.

val joinRDD = kafkaStream.transform( streamRDD = {

val ids = streamRDD.map(_._2).collect();

ids.map(userId =  ctable.select(user_name).where(userid = ?,
userId).toArray(0).get[String](0))

// better create a query which checks for all those ids at same time
})


On Sat, Jan 24, 2015 at 3:32 AM, Greg Temchenko s...@dicefield.com wrote:

  Hi Madhu,
 Thanks for you response!
 But as I understand in this case you select all data from the Cassandra
 table. I don't wanna do it as it can be huge. I wanna just lookup some ids
 in the table. So it doesn't make sense for me how I can put some values
 from the streamRDD to the cassandra query (to where method).

 Greg



 On 1/23/15 1:11 AM, madhu phatak wrote:

 Hi,
 Seems like you want to get username for a give user id. You can use
 transform on the kafka stream to join two RDD's. The psuedo code looks like
 this

  val joinRDD = kafkaStream.transform( streamRDD = {

  streamRDD.map(value = (value._2,value._1)) join with
  (ctable.select(userid,username))

  })

 On Fri, Jan 23, 2015 at 10:12 AM, Greg Temchenko s...@dicefield.com
 wrote:

  Hi there,

 I think I have a basic question, but I'm sort of stuck with figuring out
 how to approach it, and I thought someone could point me to the right
 direction.

 I'd like pull some data from Cassandra based on values received from an
 input stream. Something like

 val ctable = ssc.cassandraTable(keyspace, users)
 val userNames = kafkaStream.flatMap {
   case (key,userid) = {
 val userName = ctable.select(user_name).where(userid = ?,
 userId).toArray(0).get[String](0)
 Some(userId, userName)
   }
 }


 While the Cassandra query works in Spark shell, it throws an exception
 when I used it inside flatMap:

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent
 failure: Lost task 0.0 in stage 46.0 (TID 35, localhost):
 java.lang.NullPointerException:
 org.apache.spark.rdd.RDD.init(RDD.scala:125)

 com.datastax.spark.connector.rdd.CassandraRDD.init(CassandraRDD.scala:49)

 com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)

 com.datastax.spark.connector.rdd.CassandraRDD.select(CassandraRDD.scala:143)

 My understanding is that I cannot produce an RDD (Cassandra results)
 inside another RDD. But how should I approach the problem instead?



 Thanks,

 --
 Greg




  --
  Regards,
 Madhukara Phatak
 http://www.madhukaraphatak.com



 --
 Greg




-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: Problem getting Spark running on a Yarn cluster

2015-01-06 Thread Akhil Das
Just follow this documentation
http://spark.apache.org/docs/1.1.1/running-on-yarn.html

Ensure that *HADOOP_CONF_DIR* or *YARN_CONF_DIR* points to the directory
which contains the (client side) configuration files for the Hadoop
cluster. These configs are used to write to the dfs and connect to the YARN
ResourceManager.

Mostly you have wrong configuration in the environment and that's why its
connecting to the *localhost* (127.0.1.1)

Thanks
Best Regards

On Tue, Jan 6, 2015 at 8:10 PM, Sharon Rapoport sha...@plaid.com wrote:

 Hello,

 We have hadoop 2.6.0 and Yarn set up on ec2. Trying to get spark 1.1.1
 running on the Yarn cluster.
 I have of course googled around and found that this problem is solved for
 most after removing the line including 127.0.1.1 from /etc/hosts. This
 hasn’t seemed to solve this for me. Anyone has an idea where else might
 127.0.1.1 be hiding in some conf? Looked everywhere… or is there a
 completely different problem?

 Thanks,
 Sharon

 I am getting this error:

 WARN network.SendingConnection: Error finishing connection to /
 127.0.1.1:47020
 java.net.ConnectException: Connection refused




Re: Cluster getting a null pointer error

2014-12-10 Thread Yana Kadiyska
does spark-submit with SparkPi and spark-examples.jar work?

e.g.

./spark/bin/spark-submit  --class org.apache.spark.examples.SparkPi
--master spark://xx.xx.xx.xx:7077  /path/to/examples.jar


On Tue, Dec 9, 2014 at 6:58 PM, Eric Tanner eric.tan...@justenough.com
wrote:

 I have set up a cluster on AWS and am trying a really simple hello world
 program as a test.  The cluster was built using the ec2 scripts that come
 with Spark.  Anyway, I have output the error message (using --verbose)
 below.  The source code is further below that.

 Any help would be greatly appreciated.

 Thanks,

 Eric

 *Error code:*

 r...@ip-xx.xx.xx.xx ~]$ ./spark/bin/spark-submit  --verbose  --class
 com.je.test.Hello --master spark://xx.xx.xx.xx:7077
  Hello-assembly-1.0.jar
 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 Using properties file: /root/spark/conf/spark-defaults.conf
 Adding default property: spark.executor.memory=5929m
 Adding default property:
 spark.executor.extraClassPath=/root/ephemeral-hdfs/conf
 Adding default property:
 spark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/
 Using properties file: /root/spark/conf/spark-defaults.conf
 Adding default property: spark.executor.memory=5929m
 Adding default property:
 spark.executor.extraClassPath=/root/ephemeral-hdfs/conf
 Adding default property:
 spark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/
 Parsed arguments:
   master  spark://xx.xx.xx.xx:7077
   deployMode  null
   executorMemory  5929m
   executorCores   null
   totalExecutorCores  null
   propertiesFile  /root/spark/conf/spark-defaults.conf
   extraSparkPropertiesMap()
   driverMemorynull
   driverCores null
   driverExtraClassPathnull
   driverExtraLibraryPath  null
   driverExtraJavaOptions  null
   supervise   false
   queue   null
   numExecutorsnull
   files   null
   pyFiles null
   archivesnull
   mainClass   com.je.test.Hello
   primaryResource file:/root/Hello-assembly-1.0.jar
   namecom.je.test.Hello
   childArgs   []
   jarsnull
   verbose true

 Default properties from /root/spark/conf/spark-defaults.conf:
   spark.executor.extraLibraryPath - /root/ephemeral-hdfs/lib/native/
   spark.executor.memory - 5929m
   spark.executor.extraClassPath - /root/ephemeral-hdfs/conf


 Using properties file: /root/spark/conf/spark-defaults.conf
 Adding default property: spark.executor.memory=5929m
 Adding default property:
 spark.executor.extraClassPath=/root/ephemeral-hdfs/conf
 Adding default property:
 spark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/
 Main class:
 com.je.test.Hello
 Arguments:

 System properties:
 spark.executor.extraLibraryPath - /root/ephemeral-hdfs/lib/native/
 spark.executor.memory - 5929m
 SPARK_SUBMIT - true
 spark.app.name - com.je.test.Hello
 spark.jars - file:/root/Hello-assembly-1.0.jar
 spark.executor.extraClassPath - /root/ephemeral-hdfs/conf
 spark.master - spark://xxx.xx.xx.xxx:7077
 Classpath elements:
 file:/root/Hello-assembly-1.0.jar

 *Actual Error:*
 Exception in thread main java.lang.NullPointerException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


 *Source Code:*
 package com.je.test


 import org.apache.spark.{SparkConf, SparkContext}

 class Hello {

   def main(args: Array[String]): Unit = {

 val conf = new SparkConf(true)//.set(spark.cassandra.connection.host, 
 xxx.xx.xx.xxx)
 val sc = new SparkContext(spark://xxx.xx.xx.xxx:7077, Season, conf)

 println(Hello World)

   }
 }







Re: Streaming: getting total count over all windows

2014-11-13 Thread jay vyas
I would think this should be done at the application level.
After all, the core functionality of SparkStreaming is to capture RDDs in
some real time interval and process them -
not to aggregate their results.

But maybe there is a better way...

On Thu, Nov 13, 2014 at 8:28 PM, SK skrishna...@gmail.com wrote:

 Hi,

 I am using the following code to generate the (score, count) for each
 window:

 val score_count_by_window  = topic.map(r =  r._2)   // r._2 is the integer
 score
  .countByValue()

 score_count_by_window.print()

 E.g. output for a window is as follows, which means that within the Dstream
 for that window, there are 2 rdds with score 0; 3 with score 1, and 1 with
 score -1.
 (0, 2)
 (1, 3)
 (-1, 1)

 I would like to get the aggregate count for each score over all windows
 until program terminates. I tried countByValueAndWindow() but the result is
 same as countByValue() (i.e. it is producing only per window counts).
 reduceByWindow also does not produce the result I am expecting. What is the
 correct way to sum up the counts over multiple windows?

 thanks










 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-getting-total-count-over-all-windows-tp1.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
jay vyas


Re: Trouble getting filtering on field correct

2014-10-03 Thread Davies Liu
rdd.filter(lambda line: int(line.split(' ')[8]) = 125)

On Fri, Oct 3, 2014 at 8:16 PM, Chop thomrog...@att.net wrote:
 Given an RDD  with multiple lines of the form:

 u'207.86.121.131 207.86.121.131 2012-11-27 13:02:17 titlestring 622592 27
 184464'
 (fields are separated by a  )

 What pyspark function/commands do I use to filter out those lines where
 line[8] = x? (i.e line[8] = 125)

 when I use line.split( ) I get an RDD of each field in each line.

 Thanks



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-getting-filtering-on-field-correct-tp15728.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: task getting stuck

2014-09-24 Thread Ted Yu
Adding a subject.

bq.   at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(
ParquetFileReader.java:599)

Looks like there might be some issue reading the Parquet file.

Cheers

On Wed, Sep 24, 2014 at 9:10 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi Ted,

 See my previous reply to Debasish, all region servers are idle. I don't
 think it's caused by hotspotting.

 Besides, only 6 out of 3000 tasks were stuck, and their inputs are about
 only 80MB each.

 Jianshi

 On Wed, Sep 24, 2014 at 11:58 PM, Ted Yu yuzhih...@gmail.com wrote:

 I was thinking along the same line.

 Jianshi:
 See
 http://hbase.apache.org/book.html#d0e6369

 On Wed, Sep 24, 2014 at 8:56 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 HBase regionserver needs to be balancedyou might have some skewness
 in row keys and one regionserver is under pressuretry finding that key
 and replicate it using random salt

 On Wed, Sep 24, 2014 at 8:51 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi Ted,

 It converts RDD[Edge] to HBase rowkey and columns and insert them to
 HBase (in batch).

 BTW, I found batched Put actually faster than generating HFiles...


 Jianshi

 On Wed, Sep 24, 2014 at 11:49 PM, Ted Yu yuzhih...@gmail.com wrote:

 bq. at com.paypal.risk.rds.dragon.
 storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.
 apply(HbaseRDDBatch.scala:179)

 Can you reveal what HbaseRDDBatch.scala does ?

 Cheers

 On Wed, Sep 24, 2014 at 8:46 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:

 One of my big spark program always get stuck at 99% where a few tasks
 never finishes.

 I debugged it by printing out thread stacktraces, and found there're
 workers stuck at parquet.hadoop.ParquetFileReader.readNextRowGroup.

 Anyone had similar problem? I'm using Spark 1.1.0 built for HDP2.1.
 The parquet files are generated by pig using latest parquet-pig-bundle
 v1.6.0rc1.

 From Spark 1.1.0's pom.xml, Spark is using parquet v1.4.3, will this
 be problematic?

 One of the weird behavior is that another program read and sort data
 read from the same parquet files and it works fine. The only difference
 seems the buggy program uses foreachPartition and the working program 
 uses
 map.

 Here's the full stacktrace:

 Executor task launch worker-3
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at
 sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:257)
 at
 sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at
 sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 at
 org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
 at
 org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
 at
 org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:173)
 at
 org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:138)
 at
 org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:683)
 at
 org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:739)
 at
 org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:796)
 at
 org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
 at java.io.DataInputStream.readFully(DataInputStream.java:195)
 at java.io.DataInputStream.readFully(DataInputStream.java:169)
 at
 parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
 at
 parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
 at
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:139)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at
 

Re: task getting stuck

2014-09-24 Thread Debasish Das
spark SQL reads parquet file fine...did you follow one of these to
read/write parquet from spark ?

http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/

On Wed, Sep 24, 2014 at 9:29 AM, Ted Yu yuzhih...@gmail.com wrote:

 Adding a subject.

 bq.   at parquet.hadoop.ParquetFileReader$
 ConsecutiveChunkList.readAll(ParquetFileReader.java:599)

 Looks like there might be some issue reading the Parquet file.

 Cheers

 On Wed, Sep 24, 2014 at 9:10 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi Ted,

 See my previous reply to Debasish, all region servers are idle. I don't
 think it's caused by hotspotting.

 Besides, only 6 out of 3000 tasks were stuck, and their inputs are about
 only 80MB each.

 Jianshi

 On Wed, Sep 24, 2014 at 11:58 PM, Ted Yu yuzhih...@gmail.com wrote:

 I was thinking along the same line.

 Jianshi:
 See
 http://hbase.apache.org/book.html#d0e6369

 On Wed, Sep 24, 2014 at 8:56 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 HBase regionserver needs to be balancedyou might have some skewness
 in row keys and one regionserver is under pressuretry finding that key
 and replicate it using random salt

 On Wed, Sep 24, 2014 at 8:51 AM, Jianshi Huang jianshi.hu...@gmail.com
  wrote:

 Hi Ted,

 It converts RDD[Edge] to HBase rowkey and columns and insert them to
 HBase (in batch).

 BTW, I found batched Put actually faster than generating HFiles...


 Jianshi

 On Wed, Sep 24, 2014 at 11:49 PM, Ted Yu yuzhih...@gmail.com wrote:

 bq. at com.paypal.risk.rds.dragon.
 storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.
 apply(HbaseRDDBatch.scala:179)

 Can you reveal what HbaseRDDBatch.scala does ?

 Cheers

 On Wed, Sep 24, 2014 at 8:46 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:

 One of my big spark program always get stuck at 99% where a few
 tasks never finishes.

 I debugged it by printing out thread stacktraces, and found there're
 workers stuck at parquet.hadoop.ParquetFileReader.readNextRowGroup.

 Anyone had similar problem? I'm using Spark 1.1.0 built for HDP2.1.
 The parquet files are generated by pig using latest parquet-pig-bundle
 v1.6.0rc1.

 From Spark 1.1.0's pom.xml, Spark is using parquet v1.4.3, will this
 be problematic?

 One of the weird behavior is that another program read and sort data
 read from the same parquet files and it works fine. The only difference
 seems the buggy program uses foreachPartition and the working program 
 uses
 map.

 Here's the full stacktrace:

 Executor task launch worker-3
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at
 sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:257)
 at
 sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at
 sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 at
 org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
 at
 org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
 at
 org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:173)
 at
 org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:138)
 at
 org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:683)
 at
 org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:739)
 at
 org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:796)
 at
 org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
 at
 java.io.DataInputStream.readFully(DataInputStream.java:195)
 at
 java.io.DataInputStream.readFully(DataInputStream.java:169)
 at
 parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
 at
 parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
 at
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:139)
 at
 

Re: not getting output from socket connection

2014-07-13 Thread Michael Campbell
Make sure you use local[n] (where n  1) in your context setup too, (if
you're running locally), or you won't get output.


On Sat, Jul 12, 2014 at 11:36 PM, Walrus theCat walrusthe...@gmail.com
wrote:

 Thanks!

 I thought it would get passed through netcat, but given your email, I
 was able to follow this tutorial and get it to work:

 http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html




 On Fri, Jul 11, 2014 at 1:31 PM, Sean Owen so...@cloudera.com wrote:

 netcat is listening for a connection on port . It is echoing what
 you type to its console to anything that connects to  and reads.
 That is what Spark streaming does.

 If you yourself connect to  and write, nothing happens except that
 netcat echoes it. This does not cause Spark to somehow get that data.
 nc is only echoing input from the console.

 On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com
 wrote:
  Hi,
 
  I have a java application that is outputting a string every second.  I'm
  running the wordcount example that comes with Spark 1.0, and running nc
 -lk
  . When I type words into the terminal running netcat, I get counts.
  However, when I write the String onto a socket on port , I don't get
  counts.  I can see the strings showing up in the netcat terminal, but no
  counts from Spark.  If I paste in the string, I get counts.
 
  Any ideas?
 
  Thanks





Re: not getting output from socket connection

2014-07-13 Thread Walrus theCat
Hah, thanks for tidying up the paper trail here, but I was the OP (and
solver) of the recent reduce thread that ended in this solution.


On Sun, Jul 13, 2014 at 4:26 PM, Michael Campbell 
michael.campb...@gmail.com wrote:

 Make sure you use local[n] (where n  1) in your context setup too, (if
 you're running locally), or you won't get output.


 On Sat, Jul 12, 2014 at 11:36 PM, Walrus theCat walrusthe...@gmail.com
 wrote:

 Thanks!

 I thought it would get passed through netcat, but given your email, I
 was able to follow this tutorial and get it to work:


 http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html




 On Fri, Jul 11, 2014 at 1:31 PM, Sean Owen so...@cloudera.com wrote:

 netcat is listening for a connection on port . It is echoing what
 you type to its console to anything that connects to  and reads.
 That is what Spark streaming does.

 If you yourself connect to  and write, nothing happens except that
 netcat echoes it. This does not cause Spark to somehow get that data.
 nc is only echoing input from the console.

 On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com
 wrote:
  Hi,
 
  I have a java application that is outputting a string every second.
  I'm
  running the wordcount example that comes with Spark 1.0, and running
 nc -lk
  . When I type words into the terminal running netcat, I get counts.
  However, when I write the String onto a socket on port , I don't
 get
  counts.  I can see the strings showing up in the netcat terminal, but
 no
  counts from Spark.  If I paste in the string, I get counts.
 
  Any ideas?
 
  Thanks






Re: not getting output from socket connection

2014-07-12 Thread Walrus theCat
Thanks!

I thought it would get passed through netcat, but given your email, I was
able to follow this tutorial and get it to work:

http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html




On Fri, Jul 11, 2014 at 1:31 PM, Sean Owen so...@cloudera.com wrote:

 netcat is listening for a connection on port . It is echoing what
 you type to its console to anything that connects to  and reads.
 That is what Spark streaming does.

 If you yourself connect to  and write, nothing happens except that
 netcat echoes it. This does not cause Spark to somehow get that data.
 nc is only echoing input from the console.

 On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com
 wrote:
  Hi,
 
  I have a java application that is outputting a string every second.  I'm
  running the wordcount example that comes with Spark 1.0, and running nc
 -lk
  . When I type words into the terminal running netcat, I get counts.
  However, when I write the String onto a socket on port , I don't get
  counts.  I can see the strings showing up in the netcat terminal, but no
  counts from Spark.  If I paste in the string, I get counts.
 
  Any ideas?
 
  Thanks



Re: not getting output from socket connection

2014-07-11 Thread Walrus theCat
I forgot to add that I get the same behavior if I tail -f | nc localhost
 on a log file.


On Fri, Jul 11, 2014 at 1:25 PM, Walrus theCat walrusthe...@gmail.com
wrote:

 Hi,

 I have a java application that is outputting a string every second.  I'm
 running the wordcount example that comes with Spark 1.0, and running nc -lk
 . When I type words into the terminal running netcat, I get counts.
 However, when I write the String onto a socket on port , I don't get
 counts.  I can see the strings showing up in the netcat terminal, but no
 counts from Spark.  If I paste in the string, I get counts.

 Any ideas?

 Thanks



Re: not getting output from socket connection

2014-07-11 Thread Sean Owen
netcat is listening for a connection on port . It is echoing what
you type to its console to anything that connects to  and reads.
That is what Spark streaming does.

If you yourself connect to  and write, nothing happens except that
netcat echoes it. This does not cause Spark to somehow get that data.
nc is only echoing input from the console.

On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com wrote:
 Hi,

 I have a java application that is outputting a string every second.  I'm
 running the wordcount example that comes with Spark 1.0, and running nc -lk
 . When I type words into the terminal running netcat, I get counts.
 However, when I write the String onto a socket on port , I don't get
 counts.  I can see the strings showing up in the netcat terminal, but no
 counts from Spark.  If I paste in the string, I get counts.

 Any ideas?

 Thanks


Re: Not getting it

2014-03-28 Thread Sonal Goyal
Have you tried setting the partitioning ?

Best Regards,
Sonal
Nube Technologies http://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal




On Thu, Mar 27, 2014 at 10:04 AM, lannyripple lanny.rip...@gmail.comwrote:

 Hi all,

 I've got something which I think should be straightforward but it's not so
 I'm not getting it.

 I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have 16g
 of
 memory using 8 cores.

 In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
 [key,a,b,c...]).
 I have another file of 25K lines containing some number of keys which might
 be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
 something.  I'll get to that but this is toy problem that I'm using to get
 some intuition with spark.)

 Working on each file individually spark has no problem manipulating the
 files.  If I try and join or union+filter though I can't seem to find the
 join of the two files.  Code is along the lines of

 val fileA =
 sc.textFile(hdfs://.../fileA_110M.csv).map{_.split(,)}.keyBy{_(0)}
 val fileB = sc.textFile(hdfs://.../fileB_25k.csv).keyBy{x = x}

 And trying things like fileA.join(fileB) gives me heap OOM.  Trying

 (fileA ++ fileB.map{case (k,v) = (k, Array(v))}).groupBy{_._1}.filter{case
 (k, (_, xs)) = xs.exists{_.length == 1}

 just causes spark to freeze.  (In all the cases I'm trying I just use a
 final .count to force the results.)

 I suspect I'm missing something fundamental about bringing the keyed data
 together into the same partitions so it can be efficiently joined but I've
 given up for now.  If anyone can shed some light (Beyond, No really.  Use
 shark.) on what I'm not understanding it would be most helpful.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Not getting it

2014-03-28 Thread lannyripple
I've played around with it.  The CSV file looks like it gives 130
partitions.  I'm assuming that's the standard 64MB split size for HDFS
files.  I have increased number of partitions and number of tasks for
things like groupByKey and such.  Usually I start blowing up on GC
Overlimit or sometimes Heap OOM.  I recently tried throwing coalesce with
shuffle = true,  into the mix thinking it would bring the keys into the
same partition. E.g.,

(fileA ++ fileB.map{case (k,v) = (k,
Array(v)}).coalesce(fileA.partitions.length + fileB.partitions.length,
shuffle = true).groupBy...

(Which should effectively be imitating map-reduce) but I see GC Overlimit
when I do that.

I've got a stock install with num cores and worker memory set as mentioned
but even something like this

fileA.sortByKey().map{_ = 1}.reduce{_ + _}

blows up with GC Overlimit (as did .count instead of the by-hand count).

fileA.count

works.  It seems to be able to load the file as an RDD but not manipulate
it.




On Fri, Mar 28, 2014 at 3:04 AM, Sonal Goyal [via Apache Spark User List] 
ml-node+s1001560n3417...@n3.nabble.com wrote:

 Have you tried setting the partitioning ?

 Best Regards,
 Sonal
 Nube Technologies http://www.nubetech.co

 http://in.linkedin.com/in/sonalgoyal




 On Thu, Mar 27, 2014 at 10:04 AM, lannyripple [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=3417i=0
  wrote:

 Hi all,

 I've got something which I think should be straightforward but it's not so
 I'm not getting it.

 I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have 16g
 of
 memory using 8 cores.

 In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
 [key,a,b,c...]).
 I have another file of 25K lines containing some number of keys which
 might
 be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
 something.  I'll get to that but this is toy problem that I'm using to get
 some intuition with spark.)

 Working on each file individually spark has no problem manipulating the
 files.  If I try and join or union+filter though I can't seem to find the
 join of the two files.  Code is along the lines of

 val fileA =
 sc.textFile(hdfs://.../fileA_110M.csv).map{_.split(,)}.keyBy{_(0)}
 val fileB = sc.textFile(hdfs://.../fileB_25k.csv).keyBy{x = x}

 And trying things like fileA.join(fileB) gives me heap OOM.  Trying

 (fileA ++ fileB.map{case (k,v) = (k,
 Array(v))}).groupBy{_._1}.filter{case
 (k, (_, xs)) = xs.exists{_.length == 1}

 just causes spark to freeze.  (In all the cases I'm trying I just use a
 final .count to force the results.)

 I suspect I'm missing something fundamental about bringing the keyed data
 together into the same partitions so it can be efficiently joined but I've
 given up for now.  If anyone can shed some light (Beyond, No really.  Use
 shark.) on what I'm not understanding it would be most helpful.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3417.html
  To unsubscribe from Not getting it, click 
 herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=3316code=bGFubnkucmlwcGxlQGdtYWlsLmNvbXwzMzE2fDExMzI5OTY5Nzc=
 .
 NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3437.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Not getting it

2014-03-28 Thread lannyripple
Ok.  Based on Sonal's message I dived more into memory and partitioning and
got it to work.

For the CSV file I used 1024 partitions [textFile(path, 1024)] which cut
the partition size down to 8MB (based on standard HDFS 64MB splits).  For
the key file I also adjusted partitions to use about 8MB.  This was still
blowing up with GC Overlimit and Heap OOM with join.  I then set SPARK_MEM
(which is hard to tease out of the documentation) to 4g and the join
completed.

Going back to find SPARK_MEM I found this the best explanation --
https://groups.google.com/forum/#!searchin/spark-users/SPARK_MEM/spark-users/ou6cJMlBj_M/NlBHYDjG_NYJ

At a guess setting SPARK_MEM did more than changing the partitions.
 Something to play around.


On Fri, Mar 28, 2014 at 10:17 AM, Lanny Ripple lanny.rip...@gmail.comwrote:

 I've played around with it.  The CSV file looks like it gives 130
 partitions.  I'm assuming that's the standard 64MB split size for HDFS
 files.  I have increased number of partitions and number of tasks for
 things like groupByKey and such.  Usually I start blowing up on GC
 Overlimit or sometimes Heap OOM.  I recently tried throwing coalesce with
 shuffle = true,  into the mix thinking it would bring the keys into the
 same partition. E.g.,

 (fileA ++ fileB.map{case (k,v) = (k,
 Array(v)}).coalesce(fileA.partitions.length + fileB.partitions.length,
 shuffle = true).groupBy...

 (Which should effectively be imitating map-reduce) but I see GC Overlimit
 when I do that.

 I've got a stock install with num cores and worker memory set as mentioned
 but even something like this

 fileA.sortByKey().map{_ = 1}.reduce{_ + _}

 blows up with GC Overlimit (as did .count instead of the by-hand count).

 fileA.count

 works.  It seems to be able to load the file as an RDD but not manipulate
 it.




 On Fri, Mar 28, 2014 at 3:04 AM, Sonal Goyal [via Apache Spark User List]
 ml-node+s1001560n3417...@n3.nabble.com wrote:

 Have you tried setting the partitioning ?

 Best Regards,
 Sonal
 Nube Technologies http://www.nubetech.co

 http://in.linkedin.com/in/sonalgoyal




 On Thu, Mar 27, 2014 at 10:04 AM, lannyripple [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=3417i=0
  wrote:

 Hi all,

 I've got something which I think should be straightforward but it's not
 so
 I'm not getting it.

 I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have
 16g of
 memory using 8 cores.

 In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
 [key,a,b,c...]).
 I have another file of 25K lines containing some number of keys which
 might
 be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
 something.  I'll get to that but this is toy problem that I'm using to
 get
 some intuition with spark.)

 Working on each file individually spark has no problem manipulating the
 files.  If I try and join or union+filter though I can't seem to find the
 join of the two files.  Code is along the lines of

 val fileA =
 sc.textFile(hdfs://.../fileA_110M.csv).map{_.split(,)}.keyBy{_(0)}
 val fileB = sc.textFile(hdfs://.../fileB_25k.csv).keyBy{x = x}

 And trying things like fileA.join(fileB) gives me heap OOM.  Trying

 (fileA ++ fileB.map{case (k,v) = (k,
 Array(v))}).groupBy{_._1}.filter{case
 (k, (_, xs)) = xs.exists{_.length == 1}

 just causes spark to freeze.  (In all the cases I'm trying I just use a
 final .count to force the results.)

 I suspect I'm missing something fundamental about bringing the keyed data
 together into the same partitions so it can be efficiently joined but
 I've
 given up for now.  If anyone can shed some light (Beyond, No really.
  Use
 shark.) on what I'm not understanding it would be most helpful.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3417.html
  To unsubscribe from Not getting it, click 
 herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=3316code=bGFubnkucmlwcGxlQGdtYWlsLmNvbXwzMzE2fDExMzI5OTY5Nzc=
 .
 NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3438.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.