Re: dynamically infer json data not working as expected

2024-08-08 Thread Perez
Also, I checked your code but it will again give the same result even if I
do sampling because the schema of the "data" attribute is not fixed.

Any suggestions?


On Thu, Aug 8, 2024 at 12:34 PM Perez  wrote:

> Hi Mich,
>
> Thanks a lot for your answer but there is one more scenario to it.
>
> The schema of the data attribute inside the steps column is not fixed. For
> some records, I see it as a struct and for others, I see it as an Array of
> objects.
>
> So at last it treats it as string only since it gets confused I guess.
>
> So how can we tackle this?
>
> Thanks,
>


Re: dynamically infer json data not working as expected

2024-08-08 Thread Perez
Hi Mich,

Thanks a lot for your answer but there is one more scenario to it.

The schema of the data attribute inside the steps column is not fixed. For
some records, I see it as a struct and for others, I see it as an Array of
objects.

So at last it treats it as string only since it gets confused I guess.

So how can we tackle this?

Thanks,


Re: dynamically infer json data not working as expected

2024-08-05 Thread Perez
https://stackoverflow.com/questions/78835509/dynamically-infer-schema-of-json-data-using-pyspark

Any help would be appreciated.

Thanks,

On Mon, Aug 5, 2024 at 10:35 PM Perez  wrote:

> Hello everyone,
>
> I have described my problem on the SO blog :
>
>


dynamically infer json data not working as expected

2024-08-05 Thread Perez
Hello everyone,

I have described my problem on the SO blog :


AWS Glue and Python

2024-06-26 Thread Perez
Hi Team I am facing one issue here

https://stackoverflow.com/questions/78673228/unable-to-read-text-file-in-glue-job

TIA


Re: Unable to load MongoDB atlas data via PySpark because of BsonString error

2024-06-09 Thread Perez
Hi Team,

Any help in this matter would be greatly appreciated.

TIA

On Sun, Jun 9, 2024 at 11:26 AM Perez  wrote:

> Hi Team,
>
> this is the problem
> https://stackoverflow.com/questions/78593858/unable-to-load-mongodb-atlas-data-via-pyspark-jdbc-in-glue
>
> I can't go ahead with *StructType* approach since my input record is huge
> and if the underlying attributes are added or removed my code might fail.
>
> I can't change the source data either.
>
> The only thing I can think of is loading via Python client with multiple
> threads but do let me know if there is another solution for this.
>
> TIA
>


Unable to load MongoDB atlas data via PySpark because of BsonString error

2024-06-08 Thread Perez
Hi Team,

this is the problem
https://stackoverflow.com/questions/78593858/unable-to-load-mongodb-atlas-data-via-pyspark-jdbc-in-glue

I can't go ahead with *StructType* approach since my input record is huge
and if the underlying attributes are added or removed my code might fail.

I can't change the source data either.

The only thing I can think of is loading via Python client with multiple
threads but do let me know if there is another solution for this.

TIA


Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Also can I take my lower bound starting from 1 or is it index?

On Thu, Jun 6, 2024 at 8:42 PM Perez  wrote:

> Thanks again Mich. It gives the clear picture but I have again couple of
> doubts:
>
> 1) I know that there will be multiple threads that will be executed with
> 10 segment sizes each until the upper bound is reached but I didn't get
> this part of the code exactly segments = [(i, min(i + segment_size,
> upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]
>
> 2) Also performing union on these small dataframes won't impact
> performance right? since spark has to shuffle and combine less data from
> these dataframes?
>
>
> On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
> wrote:
>
>> well you can dynamically determine the upper bound by first querying the
>> database to find the maximum value of the partition column and use it as
>> the upper bound for your partitioning logic.
>>
>> def get_max_value(spark, mongo_config, column_name):
>> max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>> return max_value
>>
>> # Define your MongoDB config without the bounds first
>> mongo_config_base = {
>> "uri": "mongodb://username:password@host:port/database.collection",
>> "partitionColumn": "_id"
>> }
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> Then you need to aggregate DF from multiple threads When loading data in
>> parallel, each thread will load a segment of data into its own DataFrame.
>> To aggregate all these DataFrames into a single DataFrame, you can use t*he
>> union method in PySpark.*
>>
>> from concurrent.futures import ThreadPoolExecutor, as_completed
>> from pyspark.sql import SparkSession
>>
>> def extract_data_from_mongodb(mongo_config):
>> df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> return df
>>
>> # Function to get the maximum value of the partition column
>> def get_max_value(spark, mongo_config, column_name):
>> max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>> return max_value
>>
>> # MongoDB configuration without bounds
>> mongo_config_base = {
>> "uri": "mongodb://username:password@host:port/database.collection",
>> "partitionColumn": "_id"
>> }
>>
>> # Initialize Spark session
>> spark = SparkSession.builder \
>> .appName("MongoDBDataLoad") \
>> .config("spark.mongodb.input.uri", 
>> "mongodb://username:password@host:port/database.collection")
>> \
>> .getOrCreate()
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> # Function to load a segment
>> def load_segment(segment):
>> segment_lower_bound, segment_upper_bound = segment
>> mongo_config = mongo_config_base.copy()
>> mongo_config["lowerBound"] = str(segment_lower_bound)
>> mongo_config["upperBound"] = str(segment_upper_bound)
>> return extract_data_from_mongodb(mongo_config)
>>
>> # Collect all DataFrames from threads
>> all_dfs = []
>>
>> with ThreadPoolExecutor() as executor:
>> futures = [executor.submit(load_segment, segment) for segment in
>> segments]
>> for future in as_completed(futures):
>> try:
>> df_segment = future.result()
>> all_dfs.append(df_segment)
>> except Exception as e:
>> print(f"Error: {e}")
>>
>> # Union all DataFrames into a single DataFrame
>> if all_dfs:
>> final_df = all_dfs[0]
>> for df in all_dfs[1:]:
>> final_df = fi

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Thanks again Mich. It gives the clear picture but I have again couple of
doubts:

1) I know that there will be multiple threads that will be executed with 10
segment sizes each until the upper bound is reached but I didn't get this
part of the code exactly segments = [(i, min(i + segment_size,
upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]

2) Also performing union on these small dataframes won't impact performance
right? since spark has to shuffle and combine less data from these
dataframes?


On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
wrote:

> well you can dynamically determine the upper bound by first querying the
> database to find the maximum value of the partition column and use it as
> the upper bound for your partitioning logic.
>
> def get_max_value(spark, mongo_config, column_name):
> max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
> return max_value
>
> # Define your MongoDB config without the bounds first
> mongo_config_base = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id"
> }
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> Then you need to aggregate DF from multiple threads When loading data in
> parallel, each thread will load a segment of data into its own DataFrame.
> To aggregate all these DataFrames into a single DataFrame, you can use t*he
> union method in PySpark.*
>
> from concurrent.futures import ThreadPoolExecutor, as_completed
> from pyspark.sql import SparkSession
>
> def extract_data_from_mongodb(mongo_config):
> df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> return df
>
> # Function to get the maximum value of the partition column
> def get_max_value(spark, mongo_config, column_name):
> max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
> return max_value
>
> # MongoDB configuration without bounds
> mongo_config_base = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id"
> }
>
> # Initialize Spark session
> spark = SparkSession.builder \
> .appName("MongoDBDataLoad") \
> .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
> .getOrCreate()
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Function to load a segment
> def load_segment(segment):
> segment_lower_bound, segment_upper_bound = segment
> mongo_config = mongo_config_base.copy()
> mongo_config["lowerBound"] = str(segment_lower_bound)
> mongo_config["upperBound"] = str(segment_upper_bound)
> return extract_data_from_mongodb(mongo_config)
>
> # Collect all DataFrames from threads
> all_dfs = []
>
> with ThreadPoolExecutor() as executor:
> futures = [executor.submit(load_segment, segment) for segment in
> segments]
> for future in as_completed(futures):
> try:
> df_segment = future.result()
> all_dfs.append(df_segment)
> except Exception as e:
> print(f"Error: {e}")
>
> # Union all DataFrames into a single DataFrame
> if all_dfs:
> final_df = all_dfs[0]
> for df in all_dfs[1:]:
> final_df = final_df.union(df)
>
> # Proceed with your final DataFrame
> final_df.show()
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Thanks, Mich for your response. However, I have multiple doubts as below:

1) I am trying to load the data for the incremental batch so I am not sure
what would be my upper bound. So what can we do?
2) So as each thread loads the desired segment size's data into a dataframe
if I want to aggregate all the data from all the threads in a single
dataframe what should I do? Keep on appending in a dataframe as it comes?


On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh 
wrote:

> Yes, partitioning and parallel loading can significantly improve the
> performance of data extraction from JDBC sources or databases like MongoDB.
> This approach can leverage Spark's distributed computing capabilities,
> allowing you to load data in parallel, thus speeding up the overall data
> loading process.
>
> When loading data from JDBC sources, specifying partitioning options
> allows Spark to parallelize the data read operation. Here's how you can do
> it for a JDBC source:
>
> Something like below given the information provided
>
> from pyspark.sql import SparkSession
> from concurrent.futures import ThreadPoolExecutor, as_completed
>
> def extract_data_from_mongodb(mongo_config):
> df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> return df
>
> # MongoDB configuration
> mongo_config_template = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id",
> "lowerBound": None,
> "upperBound": None
> }
>
> lower_bound = 0
> upper_bound = 200
> segment_size = 10
>
> # Create segments
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Initialize Spark session
> spark = SparkSession.builder \
> .appName("MongoDBDataLoad") \
> .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
> .getOrCreate()
>
> # Extract data in parallel using ThreadPoolExecutor
> def load_segment(segment):
> segment_lower_bound, segment_upper_bound = segment
> mongo_config = mongo_config_template.copy()
> mongo_config["lowerBound"] = str(segment_lower_bound)
> mongo_config["upperBound"] = str(segment_upper_bound)
> return extract_data_from_mongodb(mongo_config)
>
> with ThreadPoolExecutor() as executor:
> futures = [executor.submit(load_segment, segment) for segment in
> segments]
> for future in as_completed(futures):
> try:
> df_segment = future.result()
> # Process df_segment as needed
> except Exception as e:
> print(f"Error: {e}")
>
>
> ThreadPoolExecutor enables parallel execution of tasks using multiple
> threads. Each thread can be responsible for loading a segment of the data.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London> (voted 2nd
> best university in the world after MIT https://lnkd.in/eCPt6KTj)
> London, United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Thu, 6 Jun 2024 at 00:46, Perez  wrote:
>
>> Hello experts,
>>
>> I was just wondering if I could leverage the below thing to expedite the
>> loading of the data process in Spark.
>>
>>
>> def extract_data_from_mongodb(mongo_config): df =
>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
>> connection_options=mongo_config ) return df
>>
>> mongo_config = { "connection.uri": "mongodb://url", "database": "",
>> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i
>> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
>> segment_size)] with ThreadPoolExecutor() as executor: futures =
>> [executor.submit(execution, segment) for segment in segments] for future in
>> as_completed(futures): try: future.result() except Exception as e:
>> print(f"Error: {e}")
>>
>> I am trying to leverage the parallel threads to pull data in parallel. So
>> is it effective?
>>
>


Re: Terabytes data processing via Glue

2024-06-05 Thread Perez
Thanks Nitin and Russel for your responses. Much appreciated.

On Mon, Jun 3, 2024 at 9:47 PM Russell Jurney 
wrote:

> You could use either Glue or Spark for your job. Use what you’re more
> comfortable with.
>
> Thanks,
> Russell Jurney @rjurney <http://twitter.com/rjurney>
> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> <http://facebook.com/jurney> datasyndrome.com
>
>
> On Sun, Jun 2, 2024 at 9:59 PM Perez  wrote:
>
>> Hello,
>>
>> Can I get some suggestions?
>>
>> On Sat, Jun 1, 2024 at 1:18 PM Perez  wrote:
>>
>>> Hi Team,
>>>
>>> I am planning to load and process around 2 TB historical data. For that
>>> purpose I was planning to go ahead with Glue.
>>>
>>> So is it ok if I use glue if I calculate my DPUs needed correctly? or
>>> should I go with EMR.
>>>
>>> This will be a one time activity.
>>>
>>>
>>> TIA
>>>
>>


Do we need partitioning while loading data from JDBC sources?

2024-06-05 Thread Perez
Hello experts,

I was just wondering if I could leverage the below thing to expedite the
loading of the data process in Spark.


def extract_data_from_mongodb(mongo_config): df =
glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
connection_options=mongo_config ) return df

mongo_config = { "connection.uri": "mongodb://url", "database": "",
"collection": "", "username": "", "password": "", "partitionColumn":"_id",
"lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i +
segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
segment_size)] with ThreadPoolExecutor() as executor: futures =
[executor.submit(execution, segment) for segment in segments] for future in
as_completed(futures): try: future.result() except Exception as e:
print(f"Error: {e}")

I am trying to leverage the parallel threads to pull data in parallel. So
is it effective?


Re: Terabytes data processing via Glue

2024-06-02 Thread Perez
Hello,

Can I get some suggestions?

On Sat, Jun 1, 2024 at 1:18 PM Perez  wrote:

> Hi Team,
>
> I am planning to load and process around 2 TB historical data. For that
> purpose I was planning to go ahead with Glue.
>
> So is it ok if I use glue if I calculate my DPUs needed correctly? or
> should I go with EMR.
>
> This will be a one time activity.
>
>
> TIA
>


Terabytes data processing via Glue

2024-06-01 Thread Perez
Hi Team,

I am planning to load and process around 2 TB historical data. For that
purpose I was planning to go ahead with Glue.

So is it ok if I use glue if I calculate my DPUs needed correctly? or
should I go with EMR.

This will be a one time activity.


TIA


Tox and Pyspark

2024-05-28 Thread Perez
Hi Team,

I need help with this
https://stackoverflow.com/questions/78547676/tox-with-pyspark


Re: OOM concern

2024-05-28 Thread Perez
Thanks Mich for the detailed explanation.

On Tue, May 28, 2024 at 9:53 PM Mich Talebzadeh 
wrote:

> Russell mentioned some of these issues before. So in short your mileage
> varies. For a 100 GB data transfer, the speed difference between Glue and
> EMR might not be significant, especially considering the benefits of Glue's
> managed service aspects. However, for much larger datasets or scenarios
> where speed is critical, EMR's customization options might provide a slight
> edge.
>
> My recommendation is test and Compare: If speed is a concern, consider
> running a test job with both Glue and EMR (if feasible) on a smaller subset
> of your data to compare transfer times and costs in your specific
> environment.. Focus on Benefits: If the speed difference with Glue is
> minimal but it offers significant benefits in terms of management and cost
> for your use case, Glue might still be the preferable option.. Also
> bandwidth: Ensure your network bandwidth between the database and S3 is
> sufficient to handle the data transfer rate, regardless of the service you
> choose.
>
>
> HTH
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Tue, 28 May 2024 at 16:40, Perez  wrote:
>
>> Thanks Mich.
>>
>> Yes, I agree on the costing part but how does the data transfer speed be
>> impacted? Is it because glue takes some time to initialize underlying
>> resources and then process the data?
>>
>>
>> On Tue, May 28, 2024 at 2:23 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Your mileage varies as usual
>>>
>>> Glue with DPUs seems like a strong contender for your data transfer
>>> needs based on the simplicity, scalability, and managed service aspects.
>>> However, if data transfer speed is critical or costs become a concern after
>>> testing, consider EMR as an alternative.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial
>>> College London <https://en.wikipedia.org/wiki/Imperial_College_London>
>>> London, United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Tue, 28 May 2024 at 09:04, Perez  wrote:
>>>
>>>> Thank you everyone for your response.
>>>>
>>>> I am not getting any errors as of now. I am just trying to choose the
>>>> right tool for my task which is data loading from an external source into
>>>> s3 via Glue/EMR.
>>>>
>>>> I think Glue job would be the best fit for me because I can calculate
>>>> DPUs needed (maybe keeping some extra buffer) so just wanted to check if
>>>> there are any edge cases I need to consider.
>>>>
>>>>
>>>> On Tue, May 28, 2024 at 5:39 AM Russell Jurney <
>>>> russell.jur...@gmail.com> wrote:
>>>>
>>>>> If you’re using EMR and Spark, you need to choose nodes with enough
>>>>> RAM to accommodate any given partition in your data or you can get an OOM
>>>>> error. Not sure if this job involves a reduce, but I would choose a

Re: OOM concern

2024-05-28 Thread Perez
Thanks Mich.

Yes, I agree on the costing part but how does the data transfer speed be
impacted? Is it because glue takes some time to initialize underlying
resources and then process the data?


On Tue, May 28, 2024 at 2:23 PM Mich Talebzadeh 
wrote:

> Your mileage varies as usual
>
> Glue with DPUs seems like a strong contender for your data transfer needs
> based on the simplicity, scalability, and managed service aspects. However,
> if data transfer speed is critical or costs become a concern after testing,
> consider EMR as an alternative.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Tue, 28 May 2024 at 09:04, Perez  wrote:
>
>> Thank you everyone for your response.
>>
>> I am not getting any errors as of now. I am just trying to choose the
>> right tool for my task which is data loading from an external source into
>> s3 via Glue/EMR.
>>
>> I think Glue job would be the best fit for me because I can calculate
>> DPUs needed (maybe keeping some extra buffer) so just wanted to check if
>> there are any edge cases I need to consider.
>>
>>
>> On Tue, May 28, 2024 at 5:39 AM Russell Jurney 
>> wrote:
>>
>>> If you’re using EMR and Spark, you need to choose nodes with enough RAM
>>> to accommodate any given partition in your data or you can get an OOM
>>> error. Not sure if this job involves a reduce, but I would choose a single
>>> 128GB+ memory optimized instance and then adjust parallelism as via the
>>> Dpark docs using pyspark.sql.DataFrame.repartition(n) at the start of your
>>> job.
>>>
>>> Thanks,
>>> Russell Jurney @rjurney <http://twitter.com/rjurney>
>>> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
>>> <http://facebook.com/jurney> datasyndrome.com
>>>
>>>
>>> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I want to extract the data from DB and just dump it into S3. I
>>>> don't have to perform any transformations on the data yet. My data size
>>>> would be ~100 GB (historical load).
>>>>
>>>> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
>>>> should I move to EMR.
>>>>
>>>> I don't feel the need to move to EMR but wanted the expertise
>>>> suggestions.
>>>>
>>>> TIA.
>>>>
>>>


Re: OOM concern

2024-05-27 Thread Perez
Thank you everyone for your response.

I am not getting any errors as of now. I am just trying to choose the right
tool for my task which is data loading from an external source into s3 via
Glue/EMR.

I think Glue job would be the best fit for me because I can calculate DPUs
needed (maybe keeping some extra buffer) so just wanted to check if there
are any edge cases I need to consider.


On Tue, May 28, 2024 at 5:39 AM Russell Jurney 
wrote:

> If you’re using EMR and Spark, you need to choose nodes with enough RAM to
> accommodate any given partition in your data or you can get an OOM error.
> Not sure if this job involves a reduce, but I would choose a single 128GB+
> memory optimized instance and then adjust parallelism as via the Dpark docs
> using pyspark.sql.DataFrame.repartition(n) at the start of your job.
>
> Thanks,
> Russell Jurney @rjurney <http://twitter.com/rjurney>
> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> <http://facebook.com/jurney> datasyndrome.com
>
>
> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>
>> Hi Team,
>>
>> I want to extract the data from DB and just dump it into S3. I
>> don't have to perform any transformations on the data yet. My data size
>> would be ~100 GB (historical load).
>>
>> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
>> should I move to EMR.
>>
>> I don't feel the need to move to EMR but wanted the expertise suggestions.
>>
>> TIA.
>>
>


OOM concern

2024-05-27 Thread Perez
Hi Team,

I want to extract the data from DB and just dump it into S3. I
don't have to perform any transformations on the data yet. My data size
would be ~100 GB (historical load).

Choosing the right DPUs(Glue jobs) should solve this problem right? Or
should I move to EMR.

I don't feel the need to move to EMR but wanted the expertise suggestions.

TIA.


Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-05 Thread Perez
You can try the 'optimize' command of delta lake. That will help you for
sure. It merges small files. Also, it depends on the file format. If you
are working with Parquet then still small files should not cause any issues.

P.

On Thu, Oct 5, 2023 at 10:55 AM Shao Yang Hong
 wrote:

> Hi Raghavendra,
>
> Yes, we are trying to reduce the number of files in delta as well (the
> small file problem [0][1]).
>
> We already have a scheduled app to compact files, but the number of
> files is still large, at 14K files per day.
>
> [0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
> [1]:
> https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/
>
> On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
>  wrote:
> >
> > Hi,
> > What is the purpose for which you want to use repartition() .. to reduce
> the number of files in delta?
> > Also note that there is an alternative option of using coalesce()
> instead of repartition().
> > --
> > Raghavendra
> >
> >
> > On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong <
> shaoyang.h...@ninjavan.co.invalid> wrote:
> >>
> >> Hi all on user@spark:
> >>
> >> We are looking for advice and suggestions on how to tune the
> >> .repartition() parameter.
> >>
> >> We are using Spark Streaming on our data pipeline to consume messages
> >> and persist them to a Delta Lake
> >> (https://delta.io/learn/getting-started/).
> >>
> >> We read messages from a Kafka topic, then add a generated date column
> >> as a daily partitioning, and save these records to Delta Lake. We have
> >> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> >> (so 4 Kafka partitions per executor).
> >>
> >> How then, should we use .repartition()? Should we omit this parameter?
> >> Or set it to 15? or 4?
> >>
> >> Our code looks roughly like the below:
> >>
> >> ```
> >> df = (
> >> spark.readStream.format("kafka")
> >> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> >> .option("subscribe", os.environ["KAFKA_TOPIC"])
> >> .load()
> >> )
> >>
> >> table = (
> >> df.select(
> >> from_protobuf(
> >> "value", "table", "/opt/protobuf-desc/table.desc"
> >> ).alias("msg")
> >> )
> >> .withColumn("uuid", col("msg.uuid"))
> >> # etc other columns...
> >>
> >> # generated column for daily partitioning in Delta Lake
> >> .withColumn(CREATED_DATE,
> >> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
> >> .drop("msg")
> >> )
> >>
> >> query = (
> >> table
> >> .repartition(10).writeStream
> >> .queryName(APP_NAME)
> >> .outputMode("append")
> >> .format("delta")
> >> .partitionBy(CREATED_DATE)
> >> .option("checkpointLocation", os.environ["CHECKPOINT"])
> >> .start(os.environ["DELTA_PATH"])
> >> )
> >>
> >> query.awaitTermination()
> >> spark.stop()
> >> ```
> >>
> >> Any advice would be appreciated.
> >>
> >> --
> >> Best Regards,
> >> Shao Yang HONG
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
>
>
> --
> Best Regards,
> Shao Yang HONG
> Software Engineer, Pricing, Tech
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: cache OS memory and spark usage of it

2018-04-10 Thread Jose Raul Perez Rodriguez

it was helpful,

Then, the OS needs to fill some pressure from the applications 
requesting memory to free some memory cache?


Exactly under which circumstances the OS free that memory to give it to 
applications requesting it?


I mean if the total memory is 16GB and 10GB are used for OS cache, how 
the JVM can obtain memory from that.


Thanks,


On 11/04/18 01:36, yncxcw wrote:

hi, Raúl

First, the most of the OS memory cache is used by  Page Cache
   which OS use for caching the
recent read/write I/O.

I think the understanding of OS memory cache should be discussed in two
different perspectives. From a perspective of
user-space (e.g, Spark application), it is not used, since the Spark is not
allocating memory from this part of memory.
However, from a perspective of OS, it is actually used, because the memory
pages are already allocated for caching the
I/O pages. For each I/O request, the OS always allocate memory pages to
cache it to expect these cached I/O pages can be reused in near future.
Recall, you use vim/emacs to open a large file. It is pretty slow when you
open it at the first time. But it will be much faster when you close it and
open it immediately because the file has been cached in file cache at the
first time you open it.

It is hard for Spark to use this part of memory. Because this part of the
memory is managed by OS and is transparent to applications.  The only thing
you can do is that you can continuously allocate memory from OS (by
malloc()), to some certain points which the OS senses some memory pressure,
the OS will voluntarily release the page cache to satisfy your memory
allocation. Another thing is that the memory limit of Spark is limited by
maximum JVM heap size. So your memory request from your Spark application is
actually handled by JVM not the OS.


Hope this answer can help you!


Wei




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Gabriel Perez
I had it setup with three nodes, a master and 2 slaves. Is there anything that 
would tell me it was in local mode. I am also added the –deploy-mode cluster 
flag and saw the same results.

Thanks,
Gabe

From: Mich Talebzadeh 
Date: Friday, December 2, 2016 at 12:26 PM
To: Gabriel Perez 
Cc: Jacek Laskowski , user 
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

in this POC of yours are you running this app with spark  in Local mode by any 
chance?


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 2 December 2016 at 16:54, Gabriel Perez 
mailto:gabr...@adtheorent.com>> wrote:
Hi,

The total partitions are 128 and I can tell its one executor because in the 
consumer list for kafka I see only one thread pulling and in the master spark 
UI I see the executor thread id is showing as 0 and that’s it.

Thanks,
Gabe


From: Jacek Laskowski mailto:ja...@japila.pl>>
Date: Friday, December 2, 2016 at 11:47 AM
To: Gabriel Perez mailto:gabr...@adtheorent.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

How many partitions does the topic have? How do you check how many executors 
read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" 
mailto:gabr...@adtheorent.com>> wrote:
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe


Map kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer", StringDeserializer.class );
kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
kafkaParams.put( "group.id<http://group.id>", 
"spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( 
"request.timeout.ms<http://request.timeout.ms>", "305000" );
kafkaParams.put( 
"heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" );
kafkaParams.put( 
"session.timeout.ms<http://session.timeout.ms>", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
.setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream> 
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, 
LocationStrategies.PreferConsistent(),
ConsumerStrategies. Subscribe( 
topics, kafkaParams ) );

//JavaPairDStream unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the 
aggregation key.
 */
public String call( ConsumerRecord 
record ) {


return record.key();

}
} );

JavaPairDStream pairs = records.mapToPair( new
PairFunction() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Creating new tuple to perform calculations on.
 *

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Gabriel Perez
We actually ended up reverting back to 0.9.0 in my testing environment because 
we found other products weren’t ready to go for 0.10 as well. So I am not able 
to create those snapshots. Hopefully I don’t see the same issue with 0.9.0. 
Thank you for your help thought.

Thanks,
Gabe

From: Jacek Laskowski 
Date: Friday, December 2, 2016 at 12:21 PM
To: Gabriel Perez 
Cc: user 
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

Can you post the screenshot of the Executors and Streaming tabs?

Jacek

On 2 Dec 2016 5:54 p.m., "Gabriel Perez" 
mailto:gabr...@adtheorent.com>> wrote:
Hi,

The total partitions are 128 and I can tell its one executor because in the 
consumer list for kafka I see only one thread pulling and in the master spark 
UI I see the executor thread id is showing as 0 and that’s it.

Thanks,
Gabe


From: Jacek Laskowski mailto:ja...@japila.pl>>
Date: Friday, December 2, 2016 at 11:47 AM
To: Gabriel Perez mailto:gabr...@adtheorent.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

How many partitions does the topic have? How do you check how many executors 
read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" 
mailto:gabr...@adtheorent.com>> wrote:
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe


Map kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer", StringDeserializer.class );
kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
kafkaParams.put( "group.id<http://group.id>", 
"spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( 
"request.timeout.ms<http://request.timeout.ms>", "305000" );
kafkaParams.put( 
"heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" );
kafkaParams.put( 
"session.timeout.ms<http://session.timeout.ms>", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
.setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream> 
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, 
LocationStrategies.PreferConsistent(),
ConsumerStrategies. Subscribe( 
topics, kafkaParams ) );

//JavaPairDStream unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the 
aggregation key.
 */
public String call( ConsumerRecord 
record ) {


return record.key();

}
} );

JavaPairDStream pairs = records.mapToPair( new
PairFunction() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Creating new tuple to perform calculations on.
 */
public Tuple2 call( String s ) {

return new Tuple2<>( s, 1 );
}
} );

JavaPairDStream counts = pairs.reduceByKey( new
Function2() {

private static final long serialVersionUID = 1L;

@O

Re: Kafka 0.10 & Spark Streaming 2.0.2

2016-12-02 Thread Gabriel Perez
Hi,

The total partitions are 128 and I can tell its one executor because in the 
consumer list for kafka I see only one thread pulling and in the master spark 
UI I see the executor thread id is showing as 0 and that’s it.

Thanks,
Gabe


From: Jacek Laskowski 
Date: Friday, December 2, 2016 at 11:47 AM
To: Gabriel Perez 
Cc: user 
Subject: Re: Kafka 0.10 & Spark Streaming 2.0.2

Hi,

How many partitions does the topic have? How do you check how many executors 
read from the topic?

Jacek


On 2 Dec 2016 2:44 p.m., "gabrielperez2484" 
mailto:gabr...@adtheorent.com>> wrote:
Hello,

I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently I
am running into an issue, where only one executor ("kafka consumer") is
reading from the topic. Which is causing performance to be really poor. I
have tried adding "--num-executors 8" both in the script to execute the jar
and in my java code. Here is the code below. Please let me know if I am
missing something or there is a way to increase the number of consumers to
connect to kafka.


Thanks,
Gabe


Map kafkaParams = new HashMap<>();
kafkaParams.put( "bootstrap.servers", "server:9092" );
kafkaParams.put( "key.deserializer", StringDeserializer.class );
kafkaParams.put( "value.deserializer", StringDeserializer.class 
);
kafkaParams.put( "group.id<http://group.id>", 
"spark-aggregation" );
kafkaParams.put( "auto.offset.reset", "earliest" );
kafkaParams.put( 
"request.timeout.ms<http://request.timeout.ms>", "305000" );
kafkaParams.put( 
"heartbeat.interval.ms<http://heartbeat.interval.ms>", "85000" );
kafkaParams.put( 
"session.timeout.ms<http://session.timeout.ms>", "9" );

Collection topics = Arrays.asList( "Topic" );

SparkConf sparkConf = new SparkConf().setMaster( 
"spark://server:7077" )
.setAppName( "aggregation" ).set( 
"spark.submit.deployMode", "cluster" )
.set( "spark.executor.instances", "16" );

JavaStreamingContext javaStreamingContext = new 
JavaStreamingContext(
sparkConf, new Duration( 5000 ) );

//Creates connect to the Stream.
final JavaInputDStream> 
stream =
KafkaUtils.createDirectStream(
javaStreamingContext, 
LocationStrategies.PreferConsistent(),
ConsumerStrategies. Subscribe( 
topics, kafkaParams ) );

//JavaPairDStream unifiedStream =
javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

JavaDStream records = stream.map( new
Function, String>() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Pulling key from the stream and creating the 
aggregation key.
 */
public String call( ConsumerRecord 
record ) {


return record.key();

}
} );

JavaPairDStream pairs = records.mapToPair( new
PairFunction() {

private static final long serialVersionUID = 1L;

@Override
/**
 * Creating new tuple to perform calculations on.
 */
public Tuple2 call( String s ) {

return new Tuple2<>( s, 1 );
}
} );

JavaPairDStream counts = pairs.reduceByKey( new
Function2() {

private static final long serialVersionUID = 1L;

@Override
/**
 * perform counts...
 */
public Integer call( Integer i1, Integer i2 ) {

return i1 + i2;
}
} );

stream.foreachRDD( new 
VoidFunction>>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void call( JavaRDD> rdd ) {

OffsetRange[] offsetRanges = ( 
(HasOffsetRanges) rdd.rdd()
).offsetRanges();

// some time later, after outputs have completed
( (Can

Re: VectorAssembler handling null values

2016-04-20 Thread Andres Perez
so the missing data could be on a one-off basis, or from fields that are in
general optional, or from, say, a count that is only relevant for certain
cases (very sparse):

f1|f2|f3|optF1|optF2|sparseF1
a|15|3.5|cat1|142L|
b|13|2.4|cat2|64L|catA
c|2|1.6|||
d|27|5.1||0|

-Andy

On Wed, Apr 20, 2016 at 1:38 AM, Nick Pentreath 
wrote:

> Could you provide an example of what your input data looks like?
> Supporting missing values in a sparse result vector makes sense.
>
> On Tue, 19 Apr 2016 at 23:55, Andres Perez  wrote:
>
>> Hi everyone. org.apache.spark.ml.feature.VectorAssembler currently cannot
>> handle null values. This presents a problem for us as we wish to run a
>> decision tree classifier on sometimes sparse data. Is there a particular
>> reason VectorAssembler is implemented in this way, and can anyone recommend
>> the best path for enabling VectorAssembler to build vectors for data that
>> will contain empty values?
>>
>> Thanks!
>>
>> -Andres
>>
>>


VectorAssembler handling null values

2016-04-19 Thread Andres Perez
Hi everyone. org.apache.spark.ml.feature.VectorAssembler currently cannot
handle null values. This presents a problem for us as we wish to run a
decision tree classifier on sometimes sparse data. Is there a particular
reason VectorAssembler is implemented in this way, and can anyone recommend
the best path for enabling VectorAssembler to build vectors for data that
will contain empty values?

Thanks!

-Andres


Re: Serializers problems maping RDDs to objects again

2015-11-06 Thread Iker Perez de Albeniz
I have seen that the problem is on the Geohash class that can not be
picked..  but in groupByKey i use an other custom class an there is no
problem...

2015-11-06 13:44 GMT+01:00 Iker Perez de Albeniz :

> Hi All,
>
> I am new at this list. Before sending this mail i have searched on archive
> but i have not found a solution for me.
>
> i am using spark to process user locations based on RSSI. My spark script
> look like this..
>
> text_files = sc.textFile(','.join(files[startime]))
>
> result = text_files.flatMap(
> lambda x : s3_files_to_records(x, startime) # Load from JSON
> converted CSV logs
> ).filter(
> lambda x: x['mac_time'] # Filter empty fusids
> ).map(
> lambda x: (x['mac_time'], x)# Generate K-V store
> ).combineByKey(
> preprocess_create_combinator,
> preprocess_merge_value,
> preprocess_merge_combiners
> )
>
> Using combineByKey i bilaterate/trilaterate to get device positions
>
> so if a do a
>
> result.collect()
>
> i get something like this:
>
> [
> (u'B4:3A:28:2A:AF:84_1444299600', {'latitude': 43.348926, 'iters': 1,
> 'longitude': -3.011294, 'error': 388.82514299844314}),
> (u'30:A8:DB:9F:A0:35_1444299600', {'latitude': 43.348926, 'iters': 2,
> 'longitude': -3.011294, 'error': 61.62463221959518}),
> (u'00:18:DE:94:F2:DF_1444299600', {'latitude': 43.348679, 'iters': 1,
> 'longitude': -3.010883, 'error': 436.2689859408533}),
> (u'98:03:D8:65:E5:94_1444299600', {'latitude': 43.348722, 'iters': 1,
> 'longitude': -3.011031, 'error': 346.54077346735033}),
> (u'68:DF:DD:EF:ED:21_1444299600', {'latitude': 43.348722, 'iters': 1,
> 'longitude': -3.011031, 'error': 436.2689859408533})
> ]
>
> I want to conver rdd record to object again to the start an other
> map/filter/map/combinebykey process from that result so for that i have
> creaded a new function that converts every record to a new tuple
>
> def convert_rdd_to_object(rdd):
> geohash = Geohash()
>
> result_object = rdd[1]
> result_object["mac"] = rdd[0].split("_")[0]
> result_object["timestamp"] = rdd[0].split("_")[1]
> result_object["geohash"] =
> str(geohash.encode(result_object["latitude"],result_object["longitude"]))
> return (result_object["mac"], result_object)
>
> the idea was to get something like this:
>
> [
> (u'B4:3A:28:2A:AF:84', {'latitude': 43.348926, 'iters': 1,
> 'longitude': -3.011294, 'error': 388.82514299844314, 'mac':
> 'B4:3A:28:2A:AF:84', 'timestamp': 1444299600, 'geohash': rwqerqwerw}),
> (u'30:A8:DB:9F:A0:35', {'latitude': 43.348926, 'iters': 2,
> 'longitude': -3.011294, 'error': 61.62463221959518, 'mac':
> '30:A8:DB:9F:A0:35', 'timestamp': 1444299600, 'geohash': rwqerqwerw}),
> (u'00:18:DE:94:F2:DF', {'latitude': 43.348679, 'iters': 1,
> 'longitude': -3.010883, 'error': 436.2689859408533, 'mac':
> '00:18:DE:94:F2:DF', 'timestamp': 1444299600, 'geohash': rwqerqwerw}),
> (u'98:03:D8:65:E5:94', {'latitude': 43.348722, 'iters': 1,
> 'longitude': -3.011031, 'error': 346.54077346735033, 'mac':
> '98:03:D8:65:E5:94', 'timestamp': 1444299600, 'geohash': rwqerqwerw}),
> (u'68:DF:DD:EF:ED:21', {'latitude': 43.348722, 'iters': 1,
> 'longitude': -3.011031, 'error': 436.2689859408533, 'mac':
> '68:DF:DD:EF:ED:21', 'timestamp': 1444299600, 'geohash': rwqerqwerw})
> ]
>
>
> So mi attempt was to add an other map at the end
>
>result = text_files.flatMap(
> lambda x : s3_files_to_records(x, startime) # Load from JSON
> converted CSV logs
> ).filter(
> lambda x: x['mac_time'] # Filter empty fusids
> ).map(
> lambda x: (x['mac_time'], x)# Generate K-V store
> ).combineByKey(
> preprocess_create_combinator,
> preprocess_merge_value,
> preprocess_merge_combiners
>

Serializers problems maping RDDs to objects again

2015-11-06 Thread Iker Perez de Albeniz
oop2.4/python/lib/pyspark.zip/pyspark/rdd.py",
line 757, in collect
  File
"/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py",
line 2363, in _jrdd
  File
"/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py",
line 2283, in _prepare_for_python_RDD
  File
"/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py",
line 427, in dumps
  File
"/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
line 622, in dumps
  File
"/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
line 107, in dump
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
  File
"/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
line 199, in save_function
  File
"/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
line 236, in save_function_tuple


i do not know if the problems is that i am not understanding how spark
works or what else but i do not see how to make it work and continue
map/filter/reducing the data i several concatenated steps.


Regards,

-- 
[image: Fon] <http://www.fon.com/>Iker Perez de AlbenizSenior R&D Engineer
/ Technical Lead+34 946545843Skype: iker.perez.fonAll information in this
email is confidential <http://corp.fon.com/legal/email-disclaimer>


Re: stopped SparkContext remaining active

2015-07-29 Thread Andres Perez
at scala.concurrent.Await$.result(package.scala:190)15/07/29
15:17:09 INFO YarnClientSchedulerBackend: Asking each executor to shut down

at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257)
... 6 more
15/07/29 15:17:09 ERROR YarnScheduler: Lost executor 2 on node09.tresata.com:
remote Rpc client disassociated
15/07/29 15:17:09 ERROR YarnScheduler: Lost executor 1 on node06.tresata.com:
remote Rpc client disassociated
5/07/29 15:19:09 WARN HeartbeatReceiver: Removing executor 2 with no recent
heartbeats: 128182 ms exceeds timeout 12 ms
15/07/29 15:19:09 ERROR YarnScheduler: Lost an executor 2 (already
removed): Executor heartbeat timed out after 128182 ms
15/07/29 15:19:09 WARN HeartbeatReceiver: Removing executor 1 with no
recent heartbeats: 126799 ms exceeds timeout 12 ms
15/07/29 15:19:09 ERROR YarnScheduler: Lost an executor 1 (already
removed): Executor heartbeat timed out after 126799 ms
15/07/29 15:19:09 INFO YarnClientSchedulerBackend: Requesting to kill
executor(s) 2
15/07/29 15:19:09 WARN YarnClientSchedulerBackend: Executor to kill 2 does
not exist!
15/07/29 15:19:09 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://sparkYarnAM@]. Address is now gated
for 5000 ms, all messages to this address will be delivered to dead
letters. Reason: Connection refused: /
15/07/29 15:19:09 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
ApplicationMaster has disassociated: 

It seems like the call,

driverEndpoint.askWithRetry[Boolean](StopExecutors)

inside the stopExecutors() method of CoarseGrainedSchedulerBackend is
throwing an exception, which then bubbles up as a SparkException ("Error
asking standalone scheduler to shut down executors"). This is then not
caught by the SparkContext.stop() method (and thus stop() never reaches
the clearActiveContext() call). Does this sound right?

Also, does bq == be quiet???

Thanks for the reply!

-Andres

On Wed, Jul 29, 2015 at 1:10 PM, Ted Yu  wrote:

> bq. it seems like we never get to the clearActiveContext() call by the end
>
> Looking at stop() method, there is only one early return
> after stopped.compareAndSet() call.
> Is there any clue from driver log ?
>
> Cheers
>
> On Wed, Jul 29, 2015 at 9:38 AM, Andres Perez  wrote:
>
>> Hi everyone. I'm running into an issue with SparkContexts when running on
>> Yarn. The issue is observable when I reproduce these steps in the
>> spark-shell (version 1.4.1):
>>
>> scala> sc
>> res0: org.apache.spark.SparkContext =
>> org.apache.spark.SparkContext@7b965dee
>>
>> *Note the pointer address of sc.
>>
>> (Then yarn application -kill  on the corresponding yarn
>> application)
>>
>> scala> val rdd = sc.parallelize(List(1,2,3))
>> java.lang.IllegalStateException: Cannot call methods on a stopped
>> SparkContext
>>   at
>> org.apache.spark.SparkContext.org
>> $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
>>
>>   at
>> org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:1914)
>>   at
>>
>> org.apache.spark.SparkContext.parallelize$default$2(SparkContext.scala:695)
>>   ... 49 elided
>>
>> (Great, the SparkContext has been stopped by the killed yarn application,
>> as
>> expected.)
>>
>> alternatively:
>>
>> scala> sc.stop()
>> 15/07/29 12:10:14 INFO SparkContext: SparkContext already stopped.
>>
>> (OK, so it's confirmed that it has been stopped.)
>>
>> scala> org.apache.spark.SparkContext.getOrCreate
>> res3: org.apache.spark.SparkContext =
>> org.apache.spark.SparkContext@7b965dee
>>
>> (Hm, that's the same SparkContext, note the pointer address.)
>>
>> The issue here is that the SparkContext.getOrCreate method returns either
>> the active SparkContext, if it exists, or creates a new one. Here it is
>> returning the original SparkContext, meaning the one we verified was
>> stopped
>> above is still active. How can we recover from this? We can't use the
>> current one once it's been stopped (unless we allow for multiple contexts
>> to
>> run using the spark.driver.allowMultipleContexts flag, but that's a
>> band-aid
>> solution), and we can't seem to create a new one, because the old one is
>> still marked as active.
>>
>> Digging a little deeper, in the body of the stop() method of SparkContext,
>> it seems like we never get to the clearActiveContext() call by the end,
>

stopped SparkContext remaining active

2015-07-29 Thread Andres Perez
Hi everyone. I'm running into an issue with SparkContexts when running on
Yarn. The issue is observable when I reproduce these steps in the
spark-shell (version 1.4.1):

scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7b965dee

*Note the pointer address of sc.

(Then yarn application -kill  on the corresponding yarn
application)

scala> val rdd = sc.parallelize(List(1,2,3))
java.lang.IllegalStateException: Cannot call methods on a stopped
SparkContext
  at
org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
  at
org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:1914)
  at
org.apache.spark.SparkContext.parallelize$default$2(SparkContext.scala:695)
  ... 49 elided

(Great, the SparkContext has been stopped by the killed yarn application, as
expected.)

alternatively:

scala> sc.stop()
15/07/29 12:10:14 INFO SparkContext: SparkContext already stopped.

(OK, so it's confirmed that it has been stopped.)

scala> org.apache.spark.SparkContext.getOrCreate
res3: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7b965dee

(Hm, that's the same SparkContext, note the pointer address.)

The issue here is that the SparkContext.getOrCreate method returns either
the active SparkContext, if it exists, or creates a new one. Here it is
returning the original SparkContext, meaning the one we verified was stopped
above is still active. How can we recover from this? We can't use the
current one once it's been stopped (unless we allow for multiple contexts to
run using the spark.driver.allowMultipleContexts flag, but that's a band-aid
solution), and we can't seem to create a new one, because the old one is
still marked as active.

Digging a little deeper, in the body of the stop() method of SparkContext,
it seems like we never get to the clearActiveContext() call by the end,
which would have marked the context as inactive. Any future call to stop(),
however, will exit early since the stopped variable is true (hence the
"SparkContext already stopped." log message). So I don't see any other way
to mark the context as not active. Something about how the SparkContext was
stopped after killing the yarn application is preventing the SparkContext
from cleaning up properly.

Any ideas about this?

Thanks,

Andres



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/stopped-SparkContext-remaining-active-tp24065.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Super slow caching in 1.3?

2015-04-27 Thread Christian Perez
Michael,

There is only one schema: both versions have 200 string columns in one file.

On Mon, Apr 20, 2015 at 9:08 AM, Evo Eftimov  wrote:
> Now this is very important:
>
>
>
> “Normal RDDs” refers to “batch RDDs”. However the default in-memory
> Serialization of RDDs which are part of DSTream is “Srialized” rather than
> actual (hydrated) Objects. The Spark documentation states that
> “Serialization” is required for space and garbage collection efficiency (but
> creates higher CPU load) – which makes sense consider the large number of
> RDDs which get discarded in a streaming app
>
>
>
> So what does Data Bricks actually recommend as Object Oriented model for RDD
> elements used in Spark Streaming apps – flat or not and can you provide a
> detailed description / spec of both
>
>
>
> From: Michael Armbrust [mailto:mich...@databricks.com]
> Sent: Thursday, April 16, 2015 7:23 PM
> To: Evo Eftimov
> Cc: Christian Perez; user
>
>
> Subject: Re: Super slow caching in 1.3?
>
>
>
> Here are the types that we specialize, other types will be much slower.
> This is only for Spark SQL, normal RDDs do not serialize data that is
> cached.  I'll also not that until yesterday we were missing FloatType
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala#L154
>
>
>
> Christian, can you provide the schema of the fast and slow datasets?
>
>
>
> On Thu, Apr 16, 2015 at 10:14 AM, Evo Eftimov  wrote:
>
> Michael what exactly do you mean by "flattened" version/structure here e.g.:
>
> 1. An Object with only primitive data types as attributes
> 2. An Object with  no more than one level of other Objects as attributes
> 3. An Array/List of primitive types
> 4. An Array/List of Objects
>
> This question is in general about RDDs not necessarily RDDs in the context
> of SparkSQL
>
> When answering can you also score how bad the performance of each of the
> above options is
>
>
> -Original Message-
> From: Christian Perez [mailto:christ...@svds.com]
> Sent: Thursday, April 16, 2015 6:09 PM
> To: Michael Armbrust
> Cc: user
> Subject: Re: Super slow caching in 1.3?
>
> Hi Michael,
>
> Good question! We checked 1.2 and found that it is also slow cacheing the
> same flat parquet file. Caching other file formats of the same data were
> faster by up to a factor of ~2. Note that the parquet file was created in
> Impala but the other formats were written by Spark SQL.
>
> Cheers,
>
> Christian
>
> On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust 
> wrote:
>> Do you think you are seeing a regression from 1.2?  Also, are you
>> caching nested data or flat rows?  The in-memory caching is not really
>> designed for nested data and so performs pretty slowly here (its just
>> falling back to kryo and even then there are some locking issues).
>>
>> If so, would it be possible to try caching a flattened version?
>>
>> CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable
>>
>> On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez 
>> wrote:
>>>
>>> Hi all,
>>>
>>> Has anyone else noticed very slow time to cache a Parquet file? It
>>> takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
>>> on M2 EC2 instances. Or are my expectations way off...
>>>
>>> Cheers,
>>>
>>> Christian
>>>
>>> --
>>> Christian Perez
>>> Silicon Valley Data Science
>>> Data Analyst
>>> christ...@svds.com
>>> @cp_phd
>>>
>>> -----
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>>> additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>
>
>
> --
> Christian Perez
> Silicon Valley Data Science
> Data Analyst
> christ...@svds.com
> @cp_phd
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>



-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pyspark where do third parties libraries need to be installed under Yarn-client mode

2015-04-24 Thread Christian Perez
To run MLlib, you only need numpy on each node. For additional
dependencies, you can call the spark-submit with --py-files option and
add the .zip or .egg.

https://spark.apache.org/docs/latest/submitting-applications.html

Cheers,

Christian

On Fri, Apr 24, 2015 at 1:56 AM, Hoai-Thu Vuong  wrote:
> I use sudo pip install ... for each machine in cluster. And don't think how
> submit library
>
> On Fri, Apr 24, 2015 at 4:21 AM dusts66  wrote:
>>
>> I am trying to figure out python library management.  So my question is:
>> Where do third party Python libraries(ex. numpy, scipy, etc.) need to
>> exist
>> if I running a spark job via 'spark-submit' against my cluster in 'yarn
>> client' mode.  Do the libraries need to only exist on the client(ie. the
>> server executing the driver code) or do the libraries need to exist on the
>> datanode/worker nodes where the tasks are executed?  The documentation
>> seems
>> to indicate that under 'yarn client' the libraries are only need on the
>> client machine not the entire cluster.  If the libraries are needed across
>> all cluster machines, any suggestions on a deployment strategy or
>> dependency
>> management model that works well?
>>
>> Thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-where-do-third-parties-libraries-need-to-be-installed-under-Yarn-client-mode-tp22639.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -----
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>



-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Super slow caching in 1.3?

2015-04-16 Thread Christian Perez
Hi Michael,

Good question! We checked 1.2 and found that it is also slow cacheing
the same flat parquet file. Caching other file formats of the same
data were faster by up to a factor of ~2. Note that the parquet file
was created in Impala but the other formats were written by Spark SQL.

Cheers,

Christian

On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust  wrote:
> Do you think you are seeing a regression from 1.2?  Also, are you caching
> nested data or flat rows?  The in-memory caching is not really designed for
> nested data and so performs pretty slowly here (its just falling back to
> kryo and even then there are some locking issues).
>
> If so, would it be possible to try caching a flattened version?
>
> CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable
>
> On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez  wrote:
>>
>> Hi all,
>>
>> Has anyone else noticed very slow time to cache a Parquet file? It
>> takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
>> on M2 EC2 instances. Or are my expectations way off...
>>
>> Cheers,
>>
>> Christian
>>
>> --
>> Christian Perez
>> Silicon Valley Data Science
>> Data Analyst
>> christ...@svds.com
>> @cp_phd
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>



-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Super slow caching in 1.3?

2015-04-06 Thread Christian Perez
Hi all,

Has anyone else noticed very slow time to cache a Parquet file? It
takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
on M2 EC2 instances. Or are my expectations way off...

Cheers,

Christian

-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: input size too large | Performance issues with Spark

2015-04-02 Thread Christian Perez
To Akhil's point, see Tuning Data structures. Avoid standard collection hashmap.

With fewer machines, try running 4 or 5 cores per executor and only
3-4 executors (1 per node):
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/.
Ought to reduce shuffle performance hit (someone else confirm?)

#7 see default.shuffle.partitions (default: 200)

On Sun, Mar 29, 2015 at 7:57 AM, Akhil Das  wrote:
> Go through this once, if you haven't read it already.
> https://spark.apache.org/docs/latest/tuning.html
>
> Thanks
> Best Regards
>
> On Sat, Mar 28, 2015 at 7:33 PM, nsareen  wrote:
>>
>> Hi All,
>>
>> I'm facing performance issues with spark implementation, and was briefly
>> investigating on WebUI logs, i noticed that my RDD size is 55GB & the
>> Shuffle Write is 10 GB & Input Size is 200GB. Application is a web
>> application which does predictive analytics, so we keep most of our data
>> in
>> memory. This observation was only for 30mins usage of the application on a
>> single user. We anticipate atleast 10-15 users of the application sending
>> requests in parallel, which makes me a bit nervous.
>>
>> One constraint we have is that we do not have too many nodes in a cluster,
>> we may end up with 3-4 machines at best, but they can be scaled up
>> vertically each having 24 cores / 512 GB ram etc. which can allow us to
>> make
>> a virtual 10-15 node cluster.
>>
>> Even then the input size & shuffle write is too high for my liking. Any
>> suggestions in this regard will be greatly appreciated as there aren't
>> much
>> resource on the net for handling performance issues such as these.
>>
>> Some pointers on my application's data structures & design
>>
>> 1) RDD is a JavaPairRDD with Key / Value as CustomPOJO containing 3-4
>> Hashmaps & Value containing 1 Hashmap
>> 2) Data is loaded via JDBCRDD during application startup, which also tends
>> to take a lot of time, since we massage the data once it is fetched from
>> DB
>> and then save it as JavaPairRDD.
>> 3) Most of the data is structured, but we are still using JavaPairRDD,
>> have
>> not explored the option of Spark SQL though.
>> 4) We have only one SparkContext which caters to all the requests coming
>> into the application from various users.
>> 5) During a single user session user can send 3-4 parallel stages
>> consisting
>> of Map / Group By / Join / Reduce etc.
>> 6) We have to change the RDD structure using different types of group by
>> operations since the user can do drill down drill up of the data (
>> aggregation at a higher / lower level). This is where we make use of
>> Groupby's but there is a cost associated with this.
>> 7) We have observed, that the initial RDD's we create have 40 odd
>> partitions, but post some stage executions like groupby's the partitions
>> increase to 200 or so, this was odd, and we havn't figured out why this
>> happens.
>>
>> In summary we wan to use Spark to provide us the capability to process our
>> in-memory data structure very fast as well as scale to a larger volume
>> when
>> required in the future.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/input-size-too-large-Performance-issues-with-Spark-tp22270.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>



-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: persist(MEMORY_ONLY) takes lot of time

2015-04-02 Thread Christian Perez
+1.

Caching is way too slow.

On Wed, Apr 1, 2015 at 12:33 PM, SamyaMaiti  wrote:
> Hi Experts,
>
> I have a parquet dataset of 550 MB ( 9 Blocks) in HDFS. I want to run SQL
> queries repetitively.
>
> Few questions :
>
> 1. When I do the below (persist to memory after reading from disk), it takes
> lot of time to persist to memory, any suggestions of how to tune this?
>
>  val inputP  = sqlContext.parquetFile("some HDFS path")
>  inputP.registerTempTable("sample_table")
>  inputP.persist(MEMORY_ONLY)
>  val result = sqlContext.sql("some sql query")
>  result.count
>
> Note : Once the data is persisted to memory, it takes fraction of seconds to
> return query result from the second query onwards. So my concern is how to
> reduce the time when the data is first loaded to cache.
>
>
> 2. I have observed that if I omit the below line,
>  inputP.persist(MEMORY_ONLY)
>   the first time Query execution is comparatively quick (say it take
> 1min), as the load to Memory time is saved, but to my surprise the second
> time I run the same query it takes 30 sec as the inputP is not constructed
> from disk (checked from UI).
>
>  So my question is, Does spark use some kind of internal caching for inputP
> in this scenario?
>
> Thanks in advance
>
> Regards,
> Sam
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/persist-MEMORY-ONLY-takes-lot-of-time-tp22343.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -----
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTable broken in v1.3 DataFrames?

2015-03-20 Thread Christian Perez
Any other users interested in a feature
DataFrame.saveAsExternalTable() for making _useful_ external tables in
Hive, or am I the only one? Bueller? If I start a PR for this, will it
be taken seriously?

On Thu, Mar 19, 2015 at 9:34 AM, Christian Perez  wrote:
> Hi Yin,
>
> Thanks for the clarification. My first reaction is that if this is the
> intended behavior, it is a wasted opportunity. Why create a managed
> table in Hive that cannot be read from inside Hive? I think I
> understand now that you are essentially piggybacking on Hive's
> metastore to persist table info between/across sessions, but I imagine
> others might expect more (as I have.)
>
> We find ourselves wanting to do work in Spark and persist the results
> where other users (e.g. analysts using Tableau connected to
> Hive/Impala) can explore it. I imagine this is very common. I can, of
> course, save it as parquet and create an external table in hive (which
> I will do now), but saveAsTable seems much less useful to me now.
>
> Any other opinions?
>
> Cheers,
>
> C
>
> On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai  wrote:
>> I meant table properties and serde properties are used to store metadata of
>> a Spark SQL data source table. We do not set other fields like SerDe lib.
>> For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table
>> should not show unrelated stuff like Serde lib and InputFormat. I have
>> created https://issues.apache.org/jira/browse/SPARK-6413 to track the
>> improvement on the output of DESCRIBE statement.
>>
>> On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai  wrote:
>>>
>>> Hi Christian,
>>>
>>> Your table is stored correctly in Parquet format.
>>>
>>> For saveAsTable, the table created is not a Hive table, but a Spark SQL
>>> data source table
>>> (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources).
>>> We are only using Hive's metastore to store the metadata (to be specific,
>>> only table properties and serde properties). When you look at table
>>> property, there will be a field called "spark.sql.sources.provider" and the
>>> value will be "org.apache.spark.sql.parquet.DefaultSource". You can also
>>> look at your files in the file system. They are stored by Parquet.
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez 
>>> wrote:
>>>>
>>>> Hi all,
>>>>
>>>> DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
>>>> CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
>>>> schema _and_ storage format in the Hive metastore, so that the table
>>>> cannot be read from inside Hive. Spark itself can read the table, but
>>>> Hive throws a Serialization error because it doesn't know it is
>>>> Parquet.
>>>>
>>>> val df = sc.parallelize( Array((1,2), (3,4)) ).toDF("education",
>>>> "income")
>>>> df.saveAsTable("spark_test_foo")
>>>>
>>>> Expected:
>>>>
>>>> COLUMNS(
>>>>   education BIGINT,
>>>>   income BIGINT
>>>> )
>>>>
>>>> SerDe Library:
>>>> org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
>>>> InputFormat:
>>>> org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
>>>>
>>>> Actual:
>>>>
>>>> COLUMNS(
>>>>   col array COMMENT "from deserializer"
>>>> )
>>>>
>>>> SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
>>>> InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat
>>>>
>>>> ---
>>>>
>>>> Manually changing schema and storage restores access in Hive and
>>>> doesn't affect Spark. Note also that Hive's table property
>>>> "spark.sql.sources.schema" is correct. At first glance, it looks like
>>>> the schema data is serialized when sent to Hive but not deserialized
>>>> properly on receive.
>>>>
>>>> I'm tracing execution through source code... but before I get any
>>>> deeper, can anyone reproduce this behavior?
>>>>
>>>> Cheers,
>>>>
>>>> Christian
>>>>
>>>> --
>>>> Christian Perez
>>>> Silicon Valley Data Science
>>>> Data Analyst
>>>> christ...@svds.com
>>>> @cp_phd
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>
>>
>
>
>
> --
> Christian Perez
> Silicon Valley Data Science
> Data Analyst
> christ...@svds.com
> @cp_phd



-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTable broken in v1.3 DataFrames?

2015-03-19 Thread Christian Perez
Hi Yin,

Thanks for the clarification. My first reaction is that if this is the
intended behavior, it is a wasted opportunity. Why create a managed
table in Hive that cannot be read from inside Hive? I think I
understand now that you are essentially piggybacking on Hive's
metastore to persist table info between/across sessions, but I imagine
others might expect more (as I have.)

We find ourselves wanting to do work in Spark and persist the results
where other users (e.g. analysts using Tableau connected to
Hive/Impala) can explore it. I imagine this is very common. I can, of
course, save it as parquet and create an external table in hive (which
I will do now), but saveAsTable seems much less useful to me now.

Any other opinions?

Cheers,

C

On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai  wrote:
> I meant table properties and serde properties are used to store metadata of
> a Spark SQL data source table. We do not set other fields like SerDe lib.
> For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table
> should not show unrelated stuff like Serde lib and InputFormat. I have
> created https://issues.apache.org/jira/browse/SPARK-6413 to track the
> improvement on the output of DESCRIBE statement.
>
> On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai  wrote:
>>
>> Hi Christian,
>>
>> Your table is stored correctly in Parquet format.
>>
>> For saveAsTable, the table created is not a Hive table, but a Spark SQL
>> data source table
>> (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources).
>> We are only using Hive's metastore to store the metadata (to be specific,
>> only table properties and serde properties). When you look at table
>> property, there will be a field called "spark.sql.sources.provider" and the
>> value will be "org.apache.spark.sql.parquet.DefaultSource". You can also
>> look at your files in the file system. They are stored by Parquet.
>>
>> Thanks,
>>
>> Yin
>>
>> On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez 
>> wrote:
>>>
>>> Hi all,
>>>
>>> DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
>>> CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
>>> schema _and_ storage format in the Hive metastore, so that the table
>>> cannot be read from inside Hive. Spark itself can read the table, but
>>> Hive throws a Serialization error because it doesn't know it is
>>> Parquet.
>>>
>>> val df = sc.parallelize( Array((1,2), (3,4)) ).toDF("education",
>>> "income")
>>> df.saveAsTable("spark_test_foo")
>>>
>>> Expected:
>>>
>>> COLUMNS(
>>>   education BIGINT,
>>>   income BIGINT
>>> )
>>>
>>> SerDe Library:
>>> org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
>>> InputFormat:
>>> org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
>>>
>>> Actual:
>>>
>>> COLUMNS(
>>>   col array COMMENT "from deserializer"
>>> )
>>>
>>> SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
>>> InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat
>>>
>>> ---
>>>
>>> Manually changing schema and storage restores access in Hive and
>>> doesn't affect Spark. Note also that Hive's table property
>>> "spark.sql.sources.schema" is correct. At first glance, it looks like
>>> the schema data is serialized when sent to Hive but not deserialized
>>> properly on receive.
>>>
>>> I'm tracing execution through source code... but before I get any
>>> deeper, can anyone reproduce this behavior?
>>>
>>> Cheers,
>>>
>>> Christian
>>>
>>> --
>>> Christian Perez
>>> Silicon Valley Data Science
>>> Data Analyst
>>> christ...@svds.com
>>> @cp_phd
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>



-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



saveAsTable broken in v1.3 DataFrames?

2015-03-19 Thread Christian Perez
Hi all,

DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
schema _and_ storage format in the Hive metastore, so that the table
cannot be read from inside Hive. Spark itself can read the table, but
Hive throws a Serialization error because it doesn't know it is
Parquet.

val df = sc.parallelize( Array((1,2), (3,4)) ).toDF("education", "income")
df.saveAsTable("spark_test_foo")

Expected:

COLUMNS(
  education BIGINT,
  income BIGINT
)

SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

Actual:

COLUMNS(
  col array COMMENT "from deserializer"
)

SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat

---

Manually changing schema and storage restores access in Hive and
doesn't affect Spark. Note also that Hive's table property
"spark.sql.sources.schema" is correct. At first glance, it looks like
the schema data is serialized when sent to Hive but not deserialized
properly on receive.

I'm tracing execution through source code... but before I get any
deeper, can anyone reproduce this behavior?

Cheers,

Christian

-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem starting worker processes in standalone mode

2014-03-24 Thread Yonathan Perez
Oh, I also forgot to mention:

I start the master and workers (call ./sbin/start-all.sh), and then start
the shell:
MASTER=spark://localhost:7077 ./bin/spark-shell

Then I get the exceptions...

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-starting-worker-processes-in-standalone-mode-tp3102p3103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Problem starting worker processes in standalone mode

2014-03-24 Thread Yonathan Perez
Hi,

I'm running my program on a single large memory many core machine (64 cores,
1TB RAM). But to avoid having huge JVMs, I want to use several processes /
worker instances - each using 8 cores (i.e. use SPARK_WORKER_INSTANCES).
When I use 2 worker instances, everything works fine, but when I try using 4
or more worker instances and start the spark-shell, I get the following
exceptions by the workers:

14/03/24 08:18:51 ERROR ActorSystemImpl: Uncaught fatal error from thread
[spark-akka.actor.default-dispatcher-3] shutting down ActorSystem [spark]
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:691)
at
scala.concurrent.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
at
scala.concurrent.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966)
at
scala.concurrent.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1829)
at 
scala.concurrent.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(AbstractDispatcher.scala:374)
at
akka.dispatch.ExecutorServiceDelegate$class.execute(ThreadPoolBuilder.scala:212)
at
akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:43)
at akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:118)
at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:59)
at akka.actor.dungeon.Dispatch$class.sendMessage(Dispatch.scala:120)
at akka.actor.ActorCell.sendMessage(ActorCell.scala:338)
at akka.actor.Cell$class.sendMessage(ActorCell.scala:259)
at akka.actor.ActorCell.sendMessage(ActorCell.scala:338)
at akka.actor.LocalActorRef.$bang(ActorRef.scala:389)
at akka.actor.Scheduler$$anon$8.run(Scheduler.scala:62)
at
akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

*The config file spark-env.sh contains:*
export JAVA_HOME=/usr/java/jdk1.7.0_09
export PATH=/usr/java/jdk1.7.0_09/bin/:$PATH

export SPARK_JAVA_OPTS="-Dspark.executor.memory=80g
-Dspark.local.dir=/lfs/local/0/yonathan/tmp   -
Dspark.serializer=org.apache.spark.serializer.KryoSerializer 
-Dspark.kryo.registrator=org.apache.spark.graphx.GraphKryoRegistrator
-Xms80g -Xmx80g 
-XX:-UseGCOverheadLimit -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps"

export SPARK_WORKER_CORES=8
export SPARK_WORKER_MEMORY=80g
export SPARK_EXECUTOR_MEMORY=80g
export SPARK_DRIVER_MEMORY=10g
export SPARK_DAEMON_MEMORY=10g
export SPARK_WORKER_INSTANCES=4
export SPARK_DAEMON_JAVA_OPTS="-Xms10g -Xmx10g"

I use *Spark-0.9.0*

I would appreciate any help or advice on the subject.

Thanks!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-starting-worker-processes-in-standalone-mode-tp3102.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: OutOfMemoryError when loading input file

2014-03-03 Thread Yonathan Perez
Thanks for your answer yxzhao, but setting SPARK_MEM doesn't solve the
problem. 
I also understand that setting SPARK_MEM is the same as calling
SparkConf.set("spark.executor.memory",..) which I do.

Any additional advice would be highly appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-when-loading-input-file-tp2213p2246.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


OutOfMemoryError when loading input file

2014-03-01 Thread Yonathan Perez
Hello,

I'm trying to run a simple test program that loads a large file (~12.4GB)
into memory of a single many-core machine.
The machine I'm using has more than enough memory (1TB RAM) and 64 cores
(of which I want to use 16 for worker threads).
Even though I set both the executor memory (spark.executor.memory) to 200GB
in SparkContext and set the JMV memory to 200GB (-Xmx200g) in spark-env.sh,
I keep getting errors when trying to load input:
"java.lang.OutOfMemoryError: GC overhead limit exceeded".
I believe that the memory configuration parameters I pass do not stick, as
I get the following message when running:
"14/03/01 22:09:31 INFO storage.MemoryStore: MemoryStore started with
capacity 883.2 MB."
Obviously I'm missing something when configuring Spark, but I can't figure
out what, and I'd appreciate your help.

The test program I'm running (not through shell, but as a standalone scala
app):

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object LoadBenchmark {
  def main(args: Array[String]) {
  val conf = new SparkConf().setMaster("local[16]").setAppName("Load
Benchmark").set("spark.executor.memory", "200g")
val sc = new SparkContext(conf)
println("LOADING INPUT FILE")
val edges =
sc.textFile("/lfs/madmax/0/yonathan/half_twitter_rv.txt").cache()
val cnt = edges.count()
println("edge count: "+ cnt)
  }
}

The contents of the spark-env.sh file:

# Examples of app-wide options : -Dspark.serializer
SPARK_JAVA_OPTS+="-Xms200g -Xmx200g -XX:-UseGCOverheadLimit"
export SPARK_JAVA_OPTS
# If using the standalone deploy mode, you can also set variables for it
here:
# - SPARK_MASTER_IP, to bind the master to a different IP address or
hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
SPARK_WORKER_CORES=16
export SPARK_WORKER_CORES
# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
SPARK_WORKER_MEMORY=200g
export SPARK_WORKER_MEMORY
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes

Thank you!