Re: Do we need partitioning while loading data from JDBC sources?

2024-06-10 Thread Gourav Sengupta
Hi,

another thing we can consider while parallelising connection with the
upstream sources is that it means you are querying the system
simultaneously and that causes usage spikes, and in case  the source system
is facing a lot of requests during production workloads the best time to
parallelise workloads from upstream systems is during their off peak hours.

It also kind of makes sense in a way that you are replicating data when
there are less changes in the upstream system, but sometimes businesses may
need very low latency replication to happen, in such cases using reader
instances (in case you are using AWS RDS), or reading database redo log
files, or their replication to brokers helps.

Each individual partitions are processed independently, in case you have
upsert operations running on the target based on which multiple upsert/
merge operations can  occur the best strategy would be first to
replicate the table in a staging area and then do the upsert/ merge
operation to the target.


Regards,
Gourav Sengupta

On Fri, Jun 7, 2024 at 1:01 AM Perez  wrote:

> Also can I take my lower bound starting from 1 or is it index?
>
> On Thu, Jun 6, 2024 at 8:42 PM Perez  wrote:
>
>> Thanks again Mich. It gives the clear picture but I have again couple of
>> doubts:
>>
>> 1) I know that there will be multiple threads that will be executed with
>> 10 segment sizes each until the upper bound is reached but I didn't get
>> this part of the code exactly segments = [(i, min(i + segment_size,
>> upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]
>>
>> 2) Also performing union on these small dataframes won't impact
>> performance right? since spark has to shuffle and combine less data from
>> these dataframes?
>>
>>
>> On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
>> wrote:
>>
>>> well you can dynamically determine the upper bound by first querying the
>>> database to find the maximum value of the partition column and use it as
>>> the upper bound for your partitioning logic.
>>>
>>> def get_max_value(spark, mongo_config, column_name):
>>> max_value_df =
>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>>> return max_value
>>>
>>> # Define your MongoDB config without the bounds first
>>> mongo_config_base = {
>>> "uri": "mongodb://username:password@host:port/database.collection",
>>> "partitionColumn": "_id"
>>> }
>>>
>>> # Fetch the dynamic upper bound
>>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>>
>>> # Define your segment size
>>> segment_size = 10
>>> lower_bound = 0
>>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>>> range(lower_bound, upper_bound, segment_size)]
>>>
>>> Then you need to aggregate DF from multiple threads When loading data in
>>> parallel, each thread will load a segment of data into its own DataFrame.
>>> To aggregate all these DataFrames into a single DataFrame, you can use t*he
>>> union method in PySpark.*
>>>
>>> from concurrent.futures import ThreadPoolExecutor, as_completed
>>> from pyspark.sql import SparkSession
>>>
>>> def extract_data_from_mongodb(mongo_config):
>>> df =
>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>> return df
>>>
>>> # Function to get the maximum value of the partition column
>>> def get_max_value(spark, mongo_config, column_name):
>>> max_value_df =
>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>>> return max_value
>>>
>>> # MongoDB configuration without bounds
>>> mongo_config_base = {
>>> "uri": "mongodb://username:password@host:port/database.collection",
>>> "partitionColumn": "_id"
>>> }
>>>
>>> # Initialize Spark session
>>> spark = SparkSession.builder \
>>> .appName("MongoDBDataLoad") \
>>> .config("spark.mongodb.input.uri", 
>>> "mongodb://username:password@host:port/database.collection")
>>> \
>>> .getOrCreate()
>>>
>>> # Fetch the dynamic upper bound
>>&

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Also can I take my lower bound starting from 1 or is it index?

On Thu, Jun 6, 2024 at 8:42 PM Perez  wrote:

> Thanks again Mich. It gives the clear picture but I have again couple of
> doubts:
>
> 1) I know that there will be multiple threads that will be executed with
> 10 segment sizes each until the upper bound is reached but I didn't get
> this part of the code exactly segments = [(i, min(i + segment_size,
> upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]
>
> 2) Also performing union on these small dataframes won't impact
> performance right? since spark has to shuffle and combine less data from
> these dataframes?
>
>
> On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
> wrote:
>
>> well you can dynamically determine the upper bound by first querying the
>> database to find the maximum value of the partition column and use it as
>> the upper bound for your partitioning logic.
>>
>> def get_max_value(spark, mongo_config, column_name):
>> max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>> return max_value
>>
>> # Define your MongoDB config without the bounds first
>> mongo_config_base = {
>> "uri": "mongodb://username:password@host:port/database.collection",
>> "partitionColumn": "_id"
>> }
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> Then you need to aggregate DF from multiple threads When loading data in
>> parallel, each thread will load a segment of data into its own DataFrame.
>> To aggregate all these DataFrames into a single DataFrame, you can use t*he
>> union method in PySpark.*
>>
>> from concurrent.futures import ThreadPoolExecutor, as_completed
>> from pyspark.sql import SparkSession
>>
>> def extract_data_from_mongodb(mongo_config):
>> df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> return df
>>
>> # Function to get the maximum value of the partition column
>> def get_max_value(spark, mongo_config, column_name):
>> max_value_df =
>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>> return max_value
>>
>> # MongoDB configuration without bounds
>> mongo_config_base = {
>> "uri": "mongodb://username:password@host:port/database.collection",
>> "partitionColumn": "_id"
>> }
>>
>> # Initialize Spark session
>> spark = SparkSession.builder \
>> .appName("MongoDBDataLoad") \
>> .config("spark.mongodb.input.uri", 
>> "mongodb://username:password@host:port/database.collection")
>> \
>> .getOrCreate()
>>
>> # Fetch the dynamic upper bound
>> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>>
>> # Define your segment size
>> segment_size = 10
>> lower_bound = 0
>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>> range(lower_bound, upper_bound, segment_size)]
>>
>> # Function to load a segment
>> def load_segment(segment):
>> segment_lower_bound, segment_upper_bound = segment
>> mongo_config = mongo_config_base.copy()
>> mongo_config["lowerBound"] = str(segment_lower_bound)
>> mongo_config["upperBound"] = str(segment_upper_bound)
>> return extract_data_from_mongodb(mongo_config)
>>
>> # Collect all DataFrames from threads
>> all_dfs = []
>>
>> with ThreadPoolExecutor() as executor:
>> futures = [executor.submit(load_segment, segment) for segment in
>> segments]
>> for future in as_completed(futures):
>> try:
>> df_segment = future.result()
>> all_dfs.append(df_segment)
>> except Exception as e:
>> print(f"Error: {e}")
>>
>> # Union all DataFrames into a single DataFrame
>> if all_dfs:
>> final_df = all_dfs[0]
>> for df in all_dfs[1:]:
>> final_df = final_df.uni

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Thanks again Mich. It gives the clear picture but I have again couple of
doubts:

1) I know that there will be multiple threads that will be executed with 10
segment sizes each until the upper bound is reached but I didn't get this
part of the code exactly segments = [(i, min(i + segment_size,
upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]

2) Also performing union on these small dataframes won't impact performance
right? since spark has to shuffle and combine less data from these
dataframes?


On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh 
wrote:

> well you can dynamically determine the upper bound by first querying the
> database to find the maximum value of the partition column and use it as
> the upper bound for your partitioning logic.
>
> def get_max_value(spark, mongo_config, column_name):
> max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
> return max_value
>
> # Define your MongoDB config without the bounds first
> mongo_config_base = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id"
> }
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> Then you need to aggregate DF from multiple threads When loading data in
> parallel, each thread will load a segment of data into its own DataFrame.
> To aggregate all these DataFrames into a single DataFrame, you can use t*he
> union method in PySpark.*
>
> from concurrent.futures import ThreadPoolExecutor, as_completed
> from pyspark.sql import SparkSession
>
> def extract_data_from_mongodb(mongo_config):
> df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> return df
>
> # Function to get the maximum value of the partition column
> def get_max_value(spark, mongo_config, column_name):
> max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
> return max_value
>
> # MongoDB configuration without bounds
> mongo_config_base = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id"
> }
>
> # Initialize Spark session
> spark = SparkSession.builder \
> .appName("MongoDBDataLoad") \
> .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
> .getOrCreate()
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Function to load a segment
> def load_segment(segment):
> segment_lower_bound, segment_upper_bound = segment
> mongo_config = mongo_config_base.copy()
> mongo_config["lowerBound"] = str(segment_lower_bound)
> mongo_config["upperBound"] = str(segment_upper_bound)
> return extract_data_from_mongodb(mongo_config)
>
> # Collect all DataFrames from threads
> all_dfs = []
>
> with ThreadPoolExecutor() as executor:
> futures = [executor.submit(load_segment, segment) for segment in
> segments]
> for future in as_completed(futures):
> try:
> df_segment = future.result()
> all_dfs.append(df_segment)
> except Exception as e:
> print(f"Error: {e}")
>
> # Union all DataFrames into a single DataFrame
> if all_dfs:
> final_df = all_dfs[0]
> for df in all_dfs[1:]:
> final_df = final_df.union(df)
>
> # Proceed with your final DataFrame
> final_df.show()
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Mich Talebzadeh
well you can dynamically determine the upper bound by first querying the
database to find the maximum value of the partition column and use it as
the upper bound for your partitioning logic.

def get_max_value(spark, mongo_config, column_name):
max_value_df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
return max_value

# Define your MongoDB config without the bounds first
mongo_config_base = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id"
}

# Fetch the dynamic upper bound
upper_bound = get_max_value(spark, mongo_config_base, "_id")

# Define your segment size
segment_size = 10
lower_bound = 0
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

Then you need to aggregate DF from multiple threads When loading data in
parallel, each thread will load a segment of data into its own DataFrame.
To aggregate all these DataFrames into a single DataFrame, you can use t*he
union method in PySpark.*

from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession

def extract_data_from_mongodb(mongo_config):
df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
return df

# Function to get the maximum value of the partition column
def get_max_value(spark, mongo_config, column_name):
max_value_df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
return max_value

# MongoDB configuration without bounds
mongo_config_base = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id"
}

# Initialize Spark session
spark = SparkSession.builder \
.appName("MongoDBDataLoad") \
.config("spark.mongodb.input.uri",
"mongodb://username:password@host:port/database.collection")
\
.getOrCreate()

# Fetch the dynamic upper bound
upper_bound = get_max_value(spark, mongo_config_base, "_id")

# Define your segment size
segment_size = 10
lower_bound = 0
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

# Function to load a segment
def load_segment(segment):
segment_lower_bound, segment_upper_bound = segment
mongo_config = mongo_config_base.copy()
mongo_config["lowerBound"] = str(segment_lower_bound)
mongo_config["upperBound"] = str(segment_upper_bound)
return extract_data_from_mongodb(mongo_config)

# Collect all DataFrames from threads
all_dfs = []

with ThreadPoolExecutor() as executor:
futures = [executor.submit(load_segment, segment) for segment in
segments]
for future in as_completed(futures):
try:
df_segment = future.result()
all_dfs.append(df_segment)
except Exception as e:
print(f"Error: {e}")

# Union all DataFrames into a single DataFrame
if all_dfs:
final_df = all_dfs[0]
for df in all_dfs[1:]:
final_df = final_df.union(df)

# Proceed with your final DataFrame
final_df.show()

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 6 Jun 2024 at 10:52, Perez  wrote:

> Thanks, Mich for your response. However, I have multiple doubts as below:
>
> 1) I am trying to load the data for the incremental batch so I am not sure
> what would be my upper bound. So what can we do?
> 2) So as each thread loads the desired segment size's data into a
> dataframe if I want to aggregate all the data from all the threads in a
> single dataframe what should I do? Keep on appending in a dataframe as it
> comes?
>
>
> On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh 
> wrote:
>
>> Yes, partitioning and parallel loading can significantly improve the
>> performance of data extraction from JDBC sources or databases like MongoDB.
>> This 

Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Perez
Thanks, Mich for your response. However, I have multiple doubts as below:

1) I am trying to load the data for the incremental batch so I am not sure
what would be my upper bound. So what can we do?
2) So as each thread loads the desired segment size's data into a dataframe
if I want to aggregate all the data from all the threads in a single
dataframe what should I do? Keep on appending in a dataframe as it comes?


On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh 
wrote:

> Yes, partitioning and parallel loading can significantly improve the
> performance of data extraction from JDBC sources or databases like MongoDB.
> This approach can leverage Spark's distributed computing capabilities,
> allowing you to load data in parallel, thus speeding up the overall data
> loading process.
>
> When loading data from JDBC sources, specifying partitioning options
> allows Spark to parallelize the data read operation. Here's how you can do
> it for a JDBC source:
>
> Something like below given the information provided
>
> from pyspark.sql import SparkSession
> from concurrent.futures import ThreadPoolExecutor, as_completed
>
> def extract_data_from_mongodb(mongo_config):
> df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
> return df
>
> # MongoDB configuration
> mongo_config_template = {
> "uri": "mongodb://username:password@host:port/database.collection",
> "partitionColumn": "_id",
> "lowerBound": None,
> "upperBound": None
> }
>
> lower_bound = 0
> upper_bound = 200
> segment_size = 10
>
> # Create segments
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Initialize Spark session
> spark = SparkSession.builder \
> .appName("MongoDBDataLoad") \
> .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
> .getOrCreate()
>
> # Extract data in parallel using ThreadPoolExecutor
> def load_segment(segment):
> segment_lower_bound, segment_upper_bound = segment
> mongo_config = mongo_config_template.copy()
> mongo_config["lowerBound"] = str(segment_lower_bound)
> mongo_config["upperBound"] = str(segment_upper_bound)
> return extract_data_from_mongodb(mongo_config)
>
> with ThreadPoolExecutor() as executor:
> futures = [executor.submit(load_segment, segment) for segment in
> segments]
> for future in as_completed(futures):
> try:
> df_segment = future.result()
> # Process df_segment as needed
> except Exception as e:
> print(f"Error: {e}")
>
>
> ThreadPoolExecutor enables parallel execution of tasks using multiple
> threads. Each thread can be responsible for loading a segment of the data.
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London> (voted 2nd
> best university in the world after MIT https://lnkd.in/eCPt6KTj)
> London, United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Thu, 6 Jun 2024 at 00:46, Perez  wrote:
>
>> Hello experts,
>>
>> I was just wondering if I could leverage the below thing to expedite the
>> loading of the data process in Spark.
>>
>>
>> def extract_data_from_mongodb(mongo_config): df =
>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
>> connection_options=mongo_config ) return df
>>
>> mongo_config = { "connection.uri": "mongodb://url", "database": "",
>> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i
>> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
>> segment_size)] with ThreadPoolExecutor() as executor: futures =
>> [executor.submit(execution, segment) for segment in segments] for future in
>> as_completed(futures): try: future.result() except Exception as e:
>> print(f"Error: {e}")
>>
>> I am trying to leverage the parallel threads to pull data in parallel. So
>> is it effective?
>>
>


Re: Do we need partitioning while loading data from JDBC sources?

2024-06-06 Thread Mich Talebzadeh
Yes, partitioning and parallel loading can significantly improve the
performance of data extraction from JDBC sources or databases like MongoDB.
This approach can leverage Spark's distributed computing capabilities,
allowing you to load data in parallel, thus speeding up the overall data
loading process.

When loading data from JDBC sources, specifying partitioning options allows
Spark to parallelize the data read operation. Here's how you can do it for
a JDBC source:

Something like below given the information provided

from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor, as_completed

def extract_data_from_mongodb(mongo_config):
df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
return df

# MongoDB configuration
mongo_config_template = {
"uri": "mongodb://username:password@host:port/database.collection",
"partitionColumn": "_id",
"lowerBound": None,
"upperBound": None
}

lower_bound = 0
upper_bound = 200
segment_size = 10

# Create segments
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

# Initialize Spark session
spark = SparkSession.builder \
.appName("MongoDBDataLoad") \
.config("spark.mongodb.input.uri",
"mongodb://username:password@host:port/database.collection")
\
.getOrCreate()

# Extract data in parallel using ThreadPoolExecutor
def load_segment(segment):
segment_lower_bound, segment_upper_bound = segment
mongo_config = mongo_config_template.copy()
mongo_config["lowerBound"] = str(segment_lower_bound)
mongo_config["upperBound"] = str(segment_upper_bound)
return extract_data_from_mongodb(mongo_config)

with ThreadPoolExecutor() as executor:
futures = [executor.submit(load_segment, segment) for segment in
segments]
for future in as_completed(futures):
try:
df_segment = future.result()
# Process df_segment as needed
except Exception as e:
print(f"Error: {e}")


ThreadPoolExecutor enables parallel execution of tasks using multiple
threads. Each thread can be responsible for loading a segment of the data.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London> (voted 2nd
best university in the world after MIT https://lnkd.in/eCPt6KTj)
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 6 Jun 2024 at 00:46, Perez  wrote:

> Hello experts,
>
> I was just wondering if I could leverage the below thing to expedite the
> loading of the data process in Spark.
>
>
> def extract_data_from_mongodb(mongo_config): df =
> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
> connection_options=mongo_config ) return df
>
> mongo_config = { "connection.uri": "mongodb://url", "database": "",
> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i
> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
> segment_size)] with ThreadPoolExecutor() as executor: futures =
> [executor.submit(execution, segment) for segment in segments] for future in
> as_completed(futures): try: future.result() except Exception as e:
> print(f"Error: {e}")
>
> I am trying to leverage the parallel threads to pull data in parallel. So
> is it effective?
>


Do we need partitioning while loading data from JDBC sources?

2024-06-05 Thread Perez
Hello experts,

I was just wondering if I could leverage the below thing to expedite the
loading of the data process in Spark.


def extract_data_from_mongodb(mongo_config): df =
glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
connection_options=mongo_config ) return df

mongo_config = { "connection.uri": "mongodb://url", "database": "",
"collection": "", "username": "", "password": "", "partitionColumn":"_id",
"lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i +
segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
segment_size)] with ThreadPoolExecutor() as executor: futures =
[executor.submit(execution, segment) for segment in segments] for future in
as_completed(futures): try: future.result() except Exception as e:
print(f"Error: {e}")

I am trying to leverage the parallel threads to pull data in parallel. So
is it effective?


S3 committer for dynamic partitioning

2024-03-05 Thread Nikhil Goyal
Hi folks,
We have been following this doc

for writing data from Spark Job to S3. However it fails writing to dynamic
partitions. Any suggestions on what config should be used to avoid the cost
of renaming in S3?

Thanks
Nikhil


dataset partitioning algorithm implementation help

2021-12-23 Thread sam smith
Hello All,

I am replicating a paper's algorithm about a partitioning approach to
anonymize datasets with Spark / Java, and want to ask you for some help to
review my 150 lines of code. My github repo, attached below, contains both
my java class and the related paper:

https://github.com/SamSmithDevs10/paperReplicationForReview

Thanks in advance.

Thanks.


Re: How default partitioning in spark is deployed

2021-03-16 Thread Attila Zsolt Piros
Oh, sure that was the reason. You can keep using the `foreachPartition` and
get the partition ID from the `TaskContext`:

scala> import org.apache.spark.TaskContext
import org.apache.spark.TaskContext

scala> myRDD.foreachPartition( e => {  println(TaskContext.getPartitionId +
":" + e.mkString(",")) } )
0:
1:
2:Animal(1,Lion)
3:
4:Animal(2,Elephant)
5:
6:
7:Animal(3,Jaguar)
8:
9:Animal(4,Tiger)
10:
11:Animal(5,Chetah)

scala>




On Tue, Mar 16, 2021 at 2:38 PM German Schiavon 
wrote:

> Hi all,
>
> I guess you could do something like this too:
>
> [image: Captura de pantalla 2021-03-16 a las 14.35.46.png]
>
> On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi Attila,
>>
>> Thanks for looking into this!
>>
>> I actually found the issue and it turned out to be that the print
>> statements misled me. The records are indeed stored in different partitions.
>> What happened is since the foreachpartition method is run parallelly by
>> different threads, they all printed the first line almost at the same time
>> and followed by data which is also printed at almost the same time. This
>> has given an appearance that all the data is stored in a single partition.
>> When I run the below code, I can see that the objects are stored in
>> different partitions of course!
>>
>> *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
>> println("Index : " +index +" " + e)); itr}, true).collect()*
>>
>> Prints the below... (index: ?  the ? is actually the partition number)
>> *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
>> Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *
>>
>> Thanks!
>>
>> On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
>> piros.attila.zs...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> This is weird. The code of foreachPartition
>>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
>>>  leads
>>> to ParallelCollectionRDD
>>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
>>>  which
>>> ends in slice
>>> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
>>> where the most important part is the *positions* method:
>>>
>>>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>>>  (0 until numSlices).iterator.map { i =>
>>> val start = ((i * length) / numSlices).toInt
>>> val end = (((i + 1) * length) / numSlices).toInt
>>> (start, end)
>>>  }
>>>  }
>>>
>>> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
>>> some scala implicit might generate a Seq with one Array in it.
>>> But in that case your output would contain an Array. So this must be not
>>> the case.
>>>
>>> 1) What Spark/Scala version you are using? on what OS?
>>>
>>> 2)  Can you reproduce this issue in the spark-shell?
>>>
>>> scala> case class Animal(id:Int, name:String)
>>> defined class Animal
>>>
>>> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
>>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
>>> Tiger"), Animal(5, "Chetah") ) ), 12)
>>> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
>>> parallelize at :27
>>>
>>> scala> myRDD.foreachPartition( e => { println("--");
>>> e.foreach(println) } )
>>> --
>>> --
>>> --
>>> Animal(1,Lion)
>>> --
>>> --
>>> Animal(2,Elephant)
>>> --
>>> --
>>> --
>>> Animal(3,Jaguar)
>>> --
>>> --
>>> Animal(4,Tiger)
>>> --
>>> --
>>> Animal(5,Chetah)
>>>
>>> scala> Console println myRDD.getNumPartitions
>>> 12
>>>
>>> 3) Can you please check spark-shell what happens when you paste the
>>> above method and call it like:
>>>
>>> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)]
>>> = {
>>> 

Re: How default partitioning in spark is deployed

2021-03-16 Thread Renganathan Mutthiah
That's a very good idea, thanks for sharing German!

On Tue, Mar 16, 2021 at 7:08 PM German Schiavon 
wrote:

> Hi all,
>
> I guess you could do something like this too:
>
> [image: Captura de pantalla 2021-03-16 a las 14.35.46.png]
>
> On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi Attila,
>>
>> Thanks for looking into this!
>>
>> I actually found the issue and it turned out to be that the print
>> statements misled me. The records are indeed stored in different partitions.
>> What happened is since the foreachpartition method is run parallelly by
>> different threads, they all printed the first line almost at the same time
>> and followed by data which is also printed at almost the same time. This
>> has given an appearance that all the data is stored in a single partition.
>> When I run the below code, I can see that the objects are stored in
>> different partitions of course!
>>
>> *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
>> println("Index : " +index +" " + e)); itr}, true).collect()*
>>
>> Prints the below... (index: ?  the ? is actually the partition number)
>> *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
>> Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *
>>
>> Thanks!
>>
>> On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
>> piros.attila.zs...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> This is weird. The code of foreachPartition
>>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
>>>  leads
>>> to ParallelCollectionRDD
>>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
>>>  which
>>> ends in slice
>>> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
>>> where the most important part is the *positions* method:
>>>
>>>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>>>  (0 until numSlices).iterator.map { i =>
>>> val start = ((i * length) / numSlices).toInt
>>> val end = (((i + 1) * length) / numSlices).toInt
>>> (start, end)
>>>  }
>>>  }
>>>
>>> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
>>> some scala implicit might generate a Seq with one Array in it.
>>> But in that case your output would contain an Array. So this must be not
>>> the case.
>>>
>>> 1) What Spark/Scala version you are using? on what OS?
>>>
>>> 2)  Can you reproduce this issue in the spark-shell?
>>>
>>> scala> case class Animal(id:Int, name:String)
>>> defined class Animal
>>>
>>> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
>>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
>>> Tiger"), Animal(5, "Chetah") ) ), 12)
>>> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
>>> parallelize at :27
>>>
>>> scala> myRDD.foreachPartition( e => { println("--");
>>> e.foreach(println) } )
>>> --
>>> --
>>> --
>>> Animal(1,Lion)
>>> --
>>> --
>>> Animal(2,Elephant)
>>> --
>>> --
>>> --
>>> Animal(3,Jaguar)
>>> --
>>> --
>>> Animal(4,Tiger)
>>> --
>>> --
>>> Animal(5,Chetah)
>>>
>>> scala> Console println myRDD.getNumPartitions
>>> 12
>>>
>>> 3) Can you please check spark-shell what happens when you paste the
>>> above method and call it like:
>>>
>>> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)]
>>> = {
>>>  |   (0 until numSlices).iterator.map { i =>
>>>  | val start = ((i * length) / numSlices).toInt
>>>  |   val end = (((i + 1) * length) / numSlices).toInt
>>>  |   (start, end)
>>>  |   }
>>>  | }
>>> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>>>
>>> scala> positions(5, 12).fo

Re: How default partitioning in spark is deployed

2021-03-16 Thread German Schiavon
Hi all,

I guess you could do something like this too:

[image: Captura de pantalla 2021-03-16 a las 14.35.46.png]

On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah 
wrote:

> Hi Attila,
>
> Thanks for looking into this!
>
> I actually found the issue and it turned out to be that the print
> statements misled me. The records are indeed stored in different partitions.
> What happened is since the foreachpartition method is run parallelly by
> different threads, they all printed the first line almost at the same time
> and followed by data which is also printed at almost the same time. This
> has given an appearance that all the data is stored in a single partition.
> When I run the below code, I can see that the objects are stored in
> different partitions of course!
>
> *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
> println("Index : " +index +" " + e)); itr}, true).collect()*
>
> Prints the below... (index: ?  the ? is actually the partition number)
> *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
> Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *
>
> Thanks!
>
> On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
> piros.attila.zs...@gmail.com> wrote:
>
>> Hi!
>>
>> This is weird. The code of foreachPartition
>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
>>  leads
>> to ParallelCollectionRDD
>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
>>  which
>> ends in slice
>> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
>> where the most important part is the *positions* method:
>>
>>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>>  (0 until numSlices).iterator.map { i =>
>> val start = ((i * length) / numSlices).toInt
>> val end = (((i + 1) * length) / numSlices).toInt
>> (start, end)
>>  }
>>  }
>>
>> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
>> some scala implicit might generate a Seq with one Array in it.
>> But in that case your output would contain an Array. So this must be not
>> the case.
>>
>> 1) What Spark/Scala version you are using? on what OS?
>>
>> 2)  Can you reproduce this issue in the spark-shell?
>>
>> scala> case class Animal(id:Int, name:String)
>> defined class Animal
>>
>> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
>> Tiger"), Animal(5, "Chetah") ) ), 12)
>> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
>> parallelize at :27
>>
>> scala> myRDD.foreachPartition( e => { println("--");
>> e.foreach(println) } )
>> --
>> --
>> --
>> Animal(1,Lion)
>> --
>> --
>> Animal(2,Elephant)
>> --
>> --
>> --
>> Animal(3,Jaguar)
>> --
>> --
>> Animal(4,Tiger)
>> --
>> --
>> Animal(5,Chetah)
>>
>> scala> Console println myRDD.getNumPartitions
>> 12
>>
>> 3) Can you please check spark-shell what happens when you paste the above
>> method and call it like:
>>
>> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)]
>> = {
>>  |   (0 until numSlices).iterator.map { i =>
>>  | val start = ((i * length) / numSlices).toInt
>>  |   val end = (((i + 1) * length) / numSlices).toInt
>>  |   (start, end)
>>  |   }
>>  | }
>> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>>
>> scala> positions(5, 12).foreach(println)
>> (0,0)
>> (0,0)
>> (0,1)
>> (1,1)
>> (1,2)
>> (2,2)
>> (2,2)
>> (2,3)
>> (3,3)
>> (3,4)
>> (4,4)
>> (4,5)
>>
>> As you can see in my case the `positions` result consistent with the 
>> `foreachPartition`
>> and this should be deterministic.
>>
>> Best regards,
>> Attila
>>
>>
>> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
>> renganatha...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have 

Re: How default partitioning in spark is deployed

2021-03-16 Thread Renganathan Mutthiah
Hi Attila,

Thanks for looking into this!

I actually found the issue and it turned out to be that the print
statements misled me. The records are indeed stored in different partitions.
What happened is since the foreachpartition method is run parallelly by
different threads, they all printed the first line almost at the same time
and followed by data which is also printed at almost the same time. This
has given an appearance that all the data is stored in a single partition.
When I run the below code, I can see that the objects are stored in
different partitions of course!

*myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
println("Index : " +index +" " + e)); itr}, true).collect()*

Prints the below... (index: ?  the ? is actually the partition number)
*Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *

Thanks!

On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
piros.attila.zs...@gmail.com> wrote:

> Hi!
>
> This is weird. The code of foreachPartition
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
>  leads
> to ParallelCollectionRDD
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
>  which
> ends in slice
> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
> where the most important part is the *positions* method:
>
>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>  (0 until numSlices).iterator.map { i =>
> val start = ((i * length) / numSlices).toInt
> val end = (((i + 1) * length) / numSlices).toInt
> (start, end)
>  }
>  }
>
> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
> some scala implicit might generate a Seq with one Array in it.
> But in that case your output would contain an Array. So this must be not
> the case.
>
> 1) What Spark/Scala version you are using? on what OS?
>
> 2)  Can you reproduce this issue in the spark-shell?
>
> scala> case class Animal(id:Int, name:String)
> defined class Animal
>
> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
> Tiger"), Animal(5, "Chetah") ) ), 12)
> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
> parallelize at :27
>
> scala> myRDD.foreachPartition( e => { println("--");
> e.foreach(println) } )
> --
> --
> --
> Animal(1,Lion)
> --
> --
> Animal(2,Elephant)
> --
> --
> --
> Animal(3,Jaguar)
> --
> --
> Animal(4,Tiger)
> --
> --
> Animal(5,Chetah)
>
> scala> Console println myRDD.getNumPartitions
> 12
>
> 3) Can you please check spark-shell what happens when you paste the above
> method and call it like:
>
> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] =
> {
>  |   (0 until numSlices).iterator.map { i =>
>  | val start = ((i * length) / numSlices).toInt
>  |   val end = (((i + 1) * length) / numSlices).toInt
>  |   (start, end)
>  |   }
>  | }
> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>
> scala> positions(5, 12).foreach(println)
> (0,0)
> (0,0)
> (0,1)
> (1,1)
> (1,2)
> (2,2)
> (2,2)
> (2,3)
> (3,3)
> (3,4)
> (4,4)
> (4,5)
>
> As you can see in my case the `positions` result consistent with the 
> `foreachPartition`
> and this should be deterministic.
>
> Best regards,
> Attila
>
>
> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a question with respect to default partitioning in RDD.
>>
>>
>>
>>
>> *case class Animal(id:Int, name:String)   val myRDD =
>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
>> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>>
>> I am running the above piece of code in my laptop which has 12 logical
>> cores.
>> Hence I see that there are 12 partitions created.
>>
>> My understanding is that hash partitioning is used to determine which
>> object needs to g

Re: How default partitioning in spark is deployed

2021-03-16 Thread Attila Zsolt Piros
Hi!

This is weird. The code of foreachPartition
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
leads
to ParallelCollectionRDD
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
which
ends in slice
<https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
where the most important part is the *positions* method:

 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
 (0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
 }
 }

Because of the extra ' (' you used in "*parallelize( (Array*" I thought
some scala implicit might generate a Seq with one Array in it.
But in that case your output would contain an Array. So this must be not
the case.

1) What Spark/Scala version you are using? on what OS?

2)  Can you reproduce this issue in the spark-shell?

scala> case class Animal(id:Int, name:String)
defined class Animal

scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
Tiger"), Animal(5, "Chetah") ) ), 12)
myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
parallelize at :27

scala> myRDD.foreachPartition( e => { println("--");
e.foreach(println) } )
--
--
--
Animal(1,Lion)
--
--
Animal(2,Elephant)
--
--
--
Animal(3,Jaguar)
--
--
Animal(4,Tiger)
--
--
Animal(5,Chetah)

scala> Console println myRDD.getNumPartitions
12

3) Can you please check spark-shell what happens when you paste the above
method and call it like:

scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
 |   (0 until numSlices).iterator.map { i =>
 | val start = ((i * length) / numSlices).toInt
 |   val end = (((i + 1) * length) / numSlices).toInt
 |   (start, end)
 |   }
 | }
positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]

scala> positions(5, 12).foreach(println)
(0,0)
(0,0)
(0,1)
(1,1)
(1,2)
(2,2)
(2,2)
(2,3)
(3,3)
(3,4)
(4,4)
(4,5)

As you can see in my case the `positions` result consistent with the
`foreachPartition`
and this should be deterministic.

Best regards,
Attila


On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
renganatha...@gmail.com> wrote:

> Hi,
>
> I have a question with respect to default partitioning in RDD.
>
>
>
>
> *case class Animal(id:Int, name:String)   val myRDD =
> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>
> I am running the above piece of code in my laptop which has 12 logical
> cores.
> Hence I see that there are 12 partitions created.
>
> My understanding is that hash partitioning is used to determine which
> object needs to go to which partition. So in this case, the formula would
> be: hashCode() % 12
> But when I further examine, I see all the RDDs are put in the last
> partition.
>
> *myRDD.foreachPartition( e => { println("--"); e.foreach(println)
> } )*
>
> Above code prints the below(first eleven partitions are empty and the last
> one has all the objects. The line is separate the partition contents):
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> Animal(2,Elephant)
> Animal(4,Tiger)
> Animal(3,Jaguar)
> Animal(5,Chetah)
> Animal(1,Lion)
>
> I don't know why this happens. Can you please help.
>
> Thanks!
>


Re: How default partitioning in spark is deployed

2021-03-16 Thread Renganathan Mutthiah
Hi Mich,

Thanks for your precious time looking into my query. Yes, when we increase
the number of objects, all partitions start having the data. I actually
tried to understand what happens in my particular case.

Thanks!

On Tue, Mar 16, 2021 at 2:10 PM Mich Talebzadeh 
wrote:

> Hi,
>
> Well as it appears you have 5 entries in your data and 12 cores. The
> theory is that you run multiple tasks in parallel across multiple cores
> on a desktop which applies to your case. The statistics is not there to
> give a meaningful interpretation why Spark decided to put all data in one
> partition. If an RDD has too many partitions, then task scheduling may
> take more time than the actual execution time. In summary you just do not
> have enough statistics to draw a meaningful conclusion.
>
> Try to generate 100,000 rows and run your query and look at the pattern.
>
> HTH
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 16 Mar 2021 at 04:35, Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a question with respect to default partitioning in RDD.
>>
>>
>>
>>
>> *case class Animal(id:Int, name:String)   val myRDD =
>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
>> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>>
>> I am running the above piece of code in my laptop which has 12 logical
>> cores.
>> Hence I see that there are 12 partitions created.
>>
>> My understanding is that hash partitioning is used to determine which
>> object needs to go to which partition. So in this case, the formula would
>> be: hashCode() % 12
>> But when I further examine, I see all the RDDs are put in the last
>> partition.
>>
>> *myRDD.foreachPartition( e => { println("--"); e.foreach(println)
>> } )*
>>
>> Above code prints the below(first eleven partitions are empty and the
>> last one has all the objects. The line is separate the partition contents):
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> Animal(2,Elephant)
>> Animal(4,Tiger)
>> Animal(3,Jaguar)
>> Animal(5,Chetah)
>> Animal(1,Lion)
>>
>> I don't know why this happens. Can you please help.
>>
>> Thanks!
>>
>


Re: How default partitioning in spark is deployed

2021-03-16 Thread Mich Talebzadeh
Hi,

Well as it appears you have 5 entries in your data and 12 cores. The theory
is that you run multiple tasks in parallel across multiple cores on a
desktop which applies to your case. The statistics is not there to give a
meaningful interpretation why Spark decided to put all data in one
partition. If an RDD has too many partitions, then task scheduling may take
more time than the actual execution time. In summary you just do not have
enough statistics to draw a meaningful conclusion.

Try to generate 100,000 rows and run your query and look at the pattern.

HTH



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 16 Mar 2021 at 04:35, Renganathan Mutthiah 
wrote:

> Hi,
>
> I have a question with respect to default partitioning in RDD.
>
>
>
>
> *case class Animal(id:Int, name:String)   val myRDD =
> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>
> I am running the above piece of code in my laptop which has 12 logical
> cores.
> Hence I see that there are 12 partitions created.
>
> My understanding is that hash partitioning is used to determine which
> object needs to go to which partition. So in this case, the formula would
> be: hashCode() % 12
> But when I further examine, I see all the RDDs are put in the last
> partition.
>
> *myRDD.foreachPartition( e => { println("--"); e.foreach(println)
> } )*
>
> Above code prints the below(first eleven partitions are empty and the last
> one has all the objects. The line is separate the partition contents):
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> Animal(2,Elephant)
> Animal(4,Tiger)
> Animal(3,Jaguar)
> Animal(5,Chetah)
> Animal(1,Lion)
>
> I don't know why this happens. Can you please help.
>
> Thanks!
>


How default partitioning in spark is deployed

2021-03-15 Thread Renganathan Mutthiah
Hi,

I have a question with respect to default partitioning in RDD.




*case class Animal(id:Int, name:String)   val myRDD =
session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
"Chetah") ) ))Console println myRDD.getNumPartitions  *

I am running the above piece of code in my laptop which has 12 logical
cores.
Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which
object needs to go to which partition. So in this case, the formula would
be: hashCode() % 12
But when I further examine, I see all the RDDs are put in the last
partition.

*myRDD.foreachPartition( e => { println("--"); e.foreach(println) }
)*

Above code prints the below(first eleven partitions are empty and the last
one has all the objects. The line is separate the partition contents):
--
--
--
--
--
--
--
--
--
--
--
--
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!


Re: Renaming a DataFrame column makes Spark lose partitioning information

2020-08-05 Thread Antoine Wendlinger
Well that's great ! Thank you very much :)


Antoine

On Tue, Aug 4, 2020 at 11:22 PM Terry Kim  wrote:

> This is fixed in Spark 3.0 by https://github.com/apache/spark/pull/26943:
>
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
>
> Seq((1, 2))
>   .toDF("a", "b")
>   .repartition($"b")
>   .withColumnRenamed("b", "c")
>   .repartition($"c")
>   .explain()
>
> // Exiting paste mode, now interpreting.
>
> == Physical Plan ==
> *(1) Project [a#7, b#8 AS c#11]
> +- Exchange hashpartitioning(b#8, 200), false, [id=#12]
>+- LocalTableScan [a#7, b#8]
>
> Thanks,
> Terry
>
> On Tue, Aug 4, 2020 at 6:26 AM Antoine Wendlinger <
> awendlin...@mytraffic.fr> wrote:
>
>> Hi,
>>
>> When renaming a DataFrame column, it looks like Spark is forgetting the
>> partition information:
>>
>> Seq((1, 2))
>>   .toDF("a", "b")
>>   .repartition($"b")
>>   .withColumnRenamed("b", "c")
>>   .repartition($"c")
>>   .explain()
>>
>> Gives the following plan:
>>
>> == Physical Plan ==
>> Exchange hashpartitioning(c#40, 10)
>> +- *(1) Project [a#36, b#37 AS c#40]
>>+- Exchange hashpartitioning(b#37, 10)
>>   +- LocalTableScan [a#36, b#37]
>>
>> As you can see, two shuffles are done, but the second one is unnecessary.
>> Is there a reason I don't know for this behavior ? Is there a way to work
>> around it (other than not renaming my columns) ?
>>
>> I'm using Spark 2.4.3.
>>
>>
>> Thanks for your help,
>>
>> Antoine
>>
>


Re: Renaming a DataFrame column makes Spark lose partitioning information

2020-08-04 Thread Terry Kim
This is fixed in Spark 3.0 by https://github.com/apache/spark/pull/26943:

scala> :paste
// Entering paste mode (ctrl-D to finish)

Seq((1, 2))
  .toDF("a", "b")
  .repartition($"b")
  .withColumnRenamed("b", "c")
  .repartition($"c")
  .explain()

// Exiting paste mode, now interpreting.

== Physical Plan ==
*(1) Project [a#7, b#8 AS c#11]
+- Exchange hashpartitioning(b#8, 200), false, [id=#12]
   +- LocalTableScan [a#7, b#8]

Thanks,
Terry

On Tue, Aug 4, 2020 at 6:26 AM Antoine Wendlinger 
wrote:

> Hi,
>
> When renaming a DataFrame column, it looks like Spark is forgetting the
> partition information:
>
> Seq((1, 2))
>   .toDF("a", "b")
>   .repartition($"b")
>   .withColumnRenamed("b", "c")
>   .repartition($"c")
>   .explain()
>
> Gives the following plan:
>
> == Physical Plan ==
> Exchange hashpartitioning(c#40, 10)
> +- *(1) Project [a#36, b#37 AS c#40]
>+- Exchange hashpartitioning(b#37, 10)
>   +- LocalTableScan [a#36, b#37]
>
> As you can see, two shuffles are done, but the second one is unnecessary.
> Is there a reason I don't know for this behavior ? Is there a way to work
> around it (other than not renaming my columns) ?
>
> I'm using Spark 2.4.3.
>
>
> Thanks for your help,
>
> Antoine
>


Renaming a DataFrame column makes Spark lose partitioning information

2020-08-04 Thread Antoine Wendlinger
Hi,

When renaming a DataFrame column, it looks like Spark is forgetting the
partition information:

Seq((1, 2))
  .toDF("a", "b")
  .repartition($"b")
  .withColumnRenamed("b", "c")
  .repartition($"c")
  .explain()

Gives the following plan:

== Physical Plan ==
Exchange hashpartitioning(c#40, 10)
+- *(1) Project [a#36, b#37 AS c#40]
   +- Exchange hashpartitioning(b#37, 10)
  +- LocalTableScan [a#36, b#37]

As you can see, two shuffles are done, but the second one is unnecessary.
Is there a reason I don't know for this behavior ? Is there a way to work
around it (other than not renaming my columns) ?

I'm using Spark 2.4.3.


Thanks for your help,

Antoine


Partitioning query

2019-09-13 Thread ☼ R Nair
Hi,

We are running a Spark JDBC code to pull data from Oracle, with some 200
partitions. Sometimes we are seeing that some tasks are failing or not
moving forward.

Is there anyway we can see/find the queries responsible for each
partition or task ? How to enable this?

Thanks

Best,
Ravion


Re: Static partitioning in partitionBy()

2019-05-08 Thread Gourav Sengupta
Hi Burak,
Hurray so you made finally delta open source :)
I always thought of asking TD, is there any chance we could get the
streaming graphs back in the SPARK UI? It will just be wonderful.

Hi Shubham,
there are always easier way and super fancy way to solve problems,
filtering data before persisting is a simple way. Similarly handling data
skew in a simple way would be by using monotonically increasing id function
in spark with modulus operator. For the fancy way I am sure that someone in
the world will be working for mere mortals like me :)


Regards,
Gourav Sengupta





On Wed, May 8, 2019 at 1:41 PM Shubham Chaurasia 
wrote:

> Thanks
>
> On Wed, May 8, 2019 at 10:36 AM Felix Cheung 
> wrote:
>
>> You could
>>
>> df.filter(col(“c”) = “c1”).write().partitionBy(“c”).save
>>
>> It could get some data skew problem but might work for you
>>
>>
>>
>> --
>> *From:* Burak Yavuz 
>> *Sent:* Tuesday, May 7, 2019 9:35:10 AM
>> *To:* Shubham Chaurasia
>> *Cc:* dev; user@spark.apache.org
>> *Subject:* Re: Static partitioning in partitionBy()
>>
>> It depends on the data source. Delta Lake (https://delta.io) allows you
>> to do it with the .option("replaceWhere", "c = c1"). With other file
>> formats, you can write directly into the partition directory
>> (tablePath/c=c1), but you lose atomicity.
>>
>> On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
>> wrote:
>>
>>> Hi All,
>>>
>>> Is there a way I can provide static partitions in partitionBy()?
>>>
>>> Like:
>>>
>>> df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save
>>>
>>> Above code gives following error as it tries to find column `c=c1` in df.
>>>
>>> org.apache.spark.sql.AnalysisException: Partition column `c=c1` not
>>> found in schema struct;
>>>
>>> Thanks,
>>> Shubham
>>>
>>


Re: Static partitioning in partitionBy()

2019-05-08 Thread Shubham Chaurasia
Thanks

On Wed, May 8, 2019 at 10:36 AM Felix Cheung 
wrote:

> You could
>
> df.filter(col(“c”) = “c1”).write().partitionBy(“c”).save
>
> It could get some data skew problem but might work for you
>
>
>
> --
> *From:* Burak Yavuz 
> *Sent:* Tuesday, May 7, 2019 9:35:10 AM
> *To:* Shubham Chaurasia
> *Cc:* dev; user@spark.apache.org
> *Subject:* Re: Static partitioning in partitionBy()
>
> It depends on the data source. Delta Lake (https://delta.io) allows you
> to do it with the .option("replaceWhere", "c = c1"). With other file
> formats, you can write directly into the partition directory
> (tablePath/c=c1), but you lose atomicity.
>
> On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
> wrote:
>
>> Hi All,
>>
>> Is there a way I can provide static partitions in partitionBy()?
>>
>> Like:
>> df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save
>>
>> Above code gives following error as it tries to find column `c=c1` in df.
>>
>> org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
>> in schema struct;
>>
>> Thanks,
>> Shubham
>>
>


Re: Static partitioning in partitionBy()

2019-05-07 Thread Felix Cheung
You could

df.filter(col(“c”) = “c1”).write().partitionBy(“c”).save

It could get some data skew problem but might work for you




From: Burak Yavuz 
Sent: Tuesday, May 7, 2019 9:35:10 AM
To: Shubham Chaurasia
Cc: dev; user@spark.apache.org
Subject: Re: Static partitioning in partitionBy()

It depends on the data source. Delta Lake (https://delta.io) allows you to do 
it with the .option("replaceWhere", "c = c1"). With other file formats, you can 
write directly into the partition directory (tablePath/c=c1), but you lose 
atomicity.

On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
mailto:shubh.chaura...@gmail.com>> wrote:
Hi All,

Is there a way I can provide static partitions in partitionBy()?

Like:
df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save

Above code gives following error as it tries to find column `c=c1` in df.

org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found in 
schema struct;

Thanks,
Shubham


Re: Static partitioning in partitionBy()

2019-05-07 Thread Burak Yavuz
It depends on the data source. Delta Lake (https://delta.io) allows you to
do it with the .option("replaceWhere", "c = c1"). With other file formats,
you can write directly into the partition directory (tablePath/c=c1), but
you lose atomicity.

On Tue, May 7, 2019, 6:36 AM Shubham Chaurasia 
wrote:

> Hi All,
>
> Is there a way I can provide static partitions in partitionBy()?
>
> Like:
> df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save
>
> Above code gives following error as it tries to find column `c=c1` in df.
>
> org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
> in schema struct;
>
> Thanks,
> Shubham
>


Static partitioning in partitionBy()

2019-05-07 Thread Shubham Chaurasia
Hi All,

Is there a way I can provide static partitions in partitionBy()?

Like:
df.write.mode("overwrite").format("MyDataSource").partitionBy("c=c1").save

Above code gives following error as it tries to find column `c=c1` in df.

org.apache.spark.sql.AnalysisException: Partition column `c=c1` not found
in schema struct;

Thanks,
Shubham


Re: Howto force spark to honor parquet partitioning

2019-05-03 Thread Gourav Sengupta
so you want data from one physical partition in the disk to go to only one
executor?

On Fri, May 3, 2019 at 5:38 PM Tomas Bartalos 
wrote:

> Hello,
>
> I have partitioned parquet files based on "event_hour" column.
> After reading parquet files to spark:
> spark.read.format("parquet").load("...")
> Files from the same parquet partition are scattered in many spark
> partitions.
>
> Example of mapping spark partition -> parquet partition:
>
> Spark partition 1 -> 2019050101, 2019050102, 2019050103
> Spark partition 2 -> 2019050101, 2019050103, 2019050104
> ...
> Spark partition 20 -> 2019050101, ...
> Spark partition 21 -> 2019050101, ...
>
> As you can see parquet partition 2019050101 is present in Spark partition
> 1, 2, 20, 21.
> As a result when I write out the dataFrame:
> df.write.partitionBy("event_hour").format("parquet").save("...")
>
>  There are many files created in one parquet partition (In case of our
> example its 4 files, but in reality its much more)
> To speed up queries, my goal is to write 1 file per parquet partition (1
> file per hour).
>
> So far my only solution is to use repartition:
> df.repartition(col("event_hour"))
>
> But there is a lot of overhead with unnecessary shuffle. I'd like to force
> spark to "pickup" the parquet partitioning.
>
> In my investigation I've found
> org.apache.spark.sql.execution.FileSourceScanExec#createNonBucketedReadRDD
> <https://github.com/apache/spark/blob/a44880ba74caab7a987128cb09c4bee41617770a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L452>
> where the initial partitioning is happening based on file sizes. There is
> an explicit ordering which causes parquet partition shuffle.
>
> thank you for your help,
> Tomas
>


Howto force spark to honor parquet partitioning

2019-05-03 Thread Tomas Bartalos
Hello,

I have partitioned parquet files based on "event_hour" column.
After reading parquet files to spark:
spark.read.format("parquet").load("...")
Files from the same parquet partition are scattered in many spark
partitions.

Example of mapping spark partition -> parquet partition:

Spark partition 1 -> 2019050101, 2019050102, 2019050103
Spark partition 2 -> 2019050101, 2019050103, 2019050104
...
Spark partition 20 -> 2019050101, ...
Spark partition 21 -> 2019050101, ...

As you can see parquet partition 2019050101 is present in Spark partition
1, 2, 20, 21.
As a result when I write out the dataFrame:
df.write.partitionBy("event_hour").format("parquet").save("...")

 There are many files created in one parquet partition (In case of our
example its 4 files, but in reality its much more)
To speed up queries, my goal is to write 1 file per parquet partition (1
file per hour).

So far my only solution is to use repartition:
df.repartition(col("event_hour"))

But there is a lot of overhead with unnecessary shuffle. I'd like to force
spark to "pickup" the parquet partitioning.

In my investigation I've found
org.apache.spark.sql.execution.FileSourceScanExec#createNonBucketedReadRDD
<https://github.com/apache/spark/blob/a44880ba74caab7a987128cb09c4bee41617770a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L452>
where the initial partitioning is happening based on file sizes. There is
an explicit ordering which causes parquet partition shuffle.

thank you for your help,
Tomas


Re: Pyspark Partitioning

2018-10-04 Thread Vitaliy Pisarev
Groupby is an operator you would use if you wanted to *aggregate* the
values that are grouped by rhe specify key.

In your case you want to retain access to the values.

You need to do df.partitionBy and then you can map the partirions. Of
course you need to be carefull of potential skews in the resulting
partitions.

On Thu, Oct 4, 2018, 23:27 dimitris plakas  wrote:

> Hello everyone,
>
> Here is an issue that i am facing in partitioning dtafarame.
>
> I have a dataframe which called data_df. It is look like:
>
> Group_Id | Object_Id | Trajectory
>1 |  obj1| Traj1
>2 |  obj2| Traj2
>1 |  obj3| Traj3
>3 |  obj4| Traj4
>2 |  obj5| Traj5
>
> This dataframe has 5045 rows where each row has value in Group_Id from 1
> to 7, and the number of rows per group_id is arbitrary.
> I want to split the rdd which produced by from this dataframe in 7
> partitions one for each group_id and then apply mapPartitions() where i
> call function custom_func(). How can i create these partitions from this
> dataframe? Should i first apply group by (create the grouped_df) in order
> to create a dataframe with 7 rows and then call
> partitioned_rdd=grouped_df.rdd.mapPartitions()?
> Which is the optimal way to do it?
>
> Thank you in advance
>


Pyspark Partitioning

2018-10-04 Thread dimitris plakas
Hello everyone,

Here is an issue that i am facing in partitioning dtafarame.

I have a dataframe which called data_df. It is look like:

Group_Id | Object_Id | Trajectory
   1 |  obj1| Traj1
   2 |  obj2| Traj2
   1 |  obj3| Traj3
   3 |  obj4| Traj4
   2 |  obj5| Traj5

This dataframe has 5045 rows where each row has value in Group_Id from 1 to
7, and the number of rows per group_id is arbitrary.
I want to split the rdd which produced by from this dataframe in 7
partitions one for each group_id and then apply mapPartitions() where i
call function custom_func(). How can i create these partitions from this
dataframe? Should i first apply group by (create the grouped_df) in order
to create a dataframe with 7 rows and then call
partitioned_rdd=grouped_df.rdd.mapPartitions()?
Which is the optimal way to do it?

Thank you in advance


Re: Pyspark Partitioning

2018-10-01 Thread Gourav Sengupta
Hi,

the most simple option is create UDF's of these different functions and
then use case statement (or similar) in SQL and pass it on. But this is low
tech, in case you have conditions based on record values which are even
more granular, why not use a single UDF, and then let conditions handle it.

But I think that UDF is not that super unless you use Scala.

It will be interesting to see if there are other scalable options (which
are not RDD based) from the group.

Regards,
Gourav Sengupta

On Sun, Sep 30, 2018 at 7:31 PM dimitris plakas 
wrote:

> Hello everyone,
>
> I am trying to split a dataframe on partitions and i want to apply a
> custom function on every partition. More precisely i have a dataframe like
> the one below
>
> Group_Id | Id | Points
> 1| id1| Point1
> 2| id2| Point2
>
> I want to have a partition for every Group_Id and apply on every partition
> a function defined by me.
> I have tried with partitionBy('Group_Id').mapPartitions() but i receive
> error.
> Could you please advice me how to do it?
>


Re: Pyspark Partitioning

2018-09-30 Thread ayan guha
Hi

There are a set pf finction which can be used with the construct
Over (partition by col order by col).

You search for rank and window functions in spark documentation.

On Mon, 1 Oct 2018 at 5:29 am, Riccardo Ferrari  wrote:

> Hi Dimitris,
>
> I believe the methods partitionBy
> 
> and mapPartitions
> 
> are specific to RDDs while you're talking about DataFrames
> .
> I guess you have few options including:
> 1. use the Dataframe.rdd
> 
> call and process the returned RDD. Please note the return type for this
> call is and RDD of Row
> 2. User the groupBy
> 
> from Dataframes and start from there, this may involved defining an udf or
> leverage on the existing GroupedData
> 
> functions.
>
> It really depends on your use-case and your performance requirements.
> HTH
>
> On Sun, Sep 30, 2018 at 8:31 PM dimitris plakas 
> wrote:
>
>> Hello everyone,
>>
>> I am trying to split a dataframe on partitions and i want to apply a
>> custom function on every partition. More precisely i have a dataframe like
>> the one below
>>
>> Group_Id | Id | Points
>> 1| id1| Point1
>> 2| id2| Point2
>>
>> I want to have a partition for every Group_Id and apply on every
>> partition a function defined by me.
>> I have tried with partitionBy('Group_Id').mapPartitions() but i receive
>> error.
>> Could you please advice me how to do it?
>>
> --
Best Regards,
Ayan Guha


Re: Pyspark Partitioning

2018-09-30 Thread Riccardo Ferrari
Hi Dimitris,

I believe the methods partitionBy

and mapPartitions

are specific to RDDs while you're talking about DataFrames
.
I guess you have few options including:
1. use the Dataframe.rdd

call and process the returned RDD. Please note the return type for this
call is and RDD of Row
2. User the groupBy

from Dataframes and start from there, this may involved defining an udf or
leverage on the existing GroupedData

functions.

It really depends on your use-case and your performance requirements.
HTH

On Sun, Sep 30, 2018 at 8:31 PM dimitris plakas 
wrote:

> Hello everyone,
>
> I am trying to split a dataframe on partitions and i want to apply a
> custom function on every partition. More precisely i have a dataframe like
> the one below
>
> Group_Id | Id | Points
> 1| id1| Point1
> 2| id2| Point2
>
> I want to have a partition for every Group_Id and apply on every partition
> a function defined by me.
> I have tried with partitionBy('Group_Id').mapPartitions() but i receive
> error.
> Could you please advice me how to do it?
>


Pyspark Partitioning

2018-09-30 Thread dimitris plakas
Hello everyone,

I am trying to split a dataframe on partitions and i want to apply a custom
function on every partition. More precisely i have a dataframe like the one
below

Group_Id | Id | Points
1| id1| Point1
2| id2| Point2

I want to have a partition for every Group_Id and apply on every partition
a function defined by me.
I have tried with partitionBy('Group_Id').mapPartitions() but i receive
error.
Could you please advice me how to do it?


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
this is actually happening, it's just wasteful overhead. The ambition is to
say "divide the data into partitions, but make sure you don't move it in
doing so".



On Tue, Aug 28, 2018 at 2:06 PM, Patrick McCarthy 
wrote:

> I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
> this is actually happening, it's just wasteful overhead.
>
> On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal 
> wrote:
>
>> Hi Patrick,
>>
>> Sorry is there something here that helps you beyond repartition(number of
>> partitons) or calling your udf on foreachPartition? If your data is on
>> disk, Spark is already partitioning it for you by rows. How is adding the
>> host info helping?
>>
>> Thanks,
>> Sonal
>> Nube Technologies <http://www.nubetech.co>
>>
>> <http://in.linkedin.com/in/sonalgoyal>
>>
>>
>>
>> On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
>> pmccar...@dstillery.com.invalid> wrote:
>>
>>> Mostly I'm guessing that it adds efficiency to a job where partitioning
>>> is required but shuffling is not.
>>>
>>> For example, if I want to apply a UDF to 1tb of records on disk, I might
>>> need to repartition(5) to get the task size down to an acceptable size
>>> for my cluster. If I don't care that it's totally balanced, then I'd hope
>>> that I could save a lot of overhead with
>>>
>>> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
>>> 'randkey','host').apply(udf)
>>>
>>>
>>> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
>>> wrote:
>>>
>>>> Well if we think of shuffling as a necessity to perform an operation,
>>>> then the problem would be that you are adding a ln aggregation stage to a
>>>> job that is going to get shuffled anyway.  Like if you need to join two
>>>> datasets, then Spark will still shuffle the data, whether they are grouped
>>>> by hostname prior to that or not.  My question is, is there anything else
>>>> that you would expect to gain, except for enforcing maybe a dataset that is
>>>> already bucketed? Like you could enforce that data is where it is supposed
>>>> to be, but what else would you avoid?
>>>>
>>>> Sent from my iPhone
>>>>
>>>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <
>>>> pmccar...@dstillery.com.INVALID> wrote:
>>>> >
>>>> > When debugging some behavior on my YARN cluster I wrote the following
>>>> PySpark UDF to figure out what host was operating on what row of data:
>>>> >
>>>> > @F.udf(T.StringType())
>>>> > def add_hostname(x):
>>>> >
>>>> > import socket
>>>> >
>>>> > return str(socket.gethostname())
>>>> >
>>>> > It occurred to me that I could use this to enforce node-locality for
>>>> other operations:
>>>> >
>>>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>>>> >
>>>> > When working on a big job without obvious partition keys, this seems
>>>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>>>> >
>>>> > What problems would I introduce by trying to partition on hostname
>>>> like this?
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
this is actually happening, it's just wasteful overhead.

On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal  wrote:

> Hi Patrick,
>
> Sorry is there something here that helps you beyond repartition(number of
> partitons) or calling your udf on foreachPartition? If your data is on
> disk, Spark is already partitioning it for you by rows. How is adding the
> host info helping?
>
> Thanks,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
> pmccar...@dstillery.com.invalid> wrote:
>
>> Mostly I'm guessing that it adds efficiency to a job where partitioning
>> is required but shuffling is not.
>>
>> For example, if I want to apply a UDF to 1tb of records on disk, I might
>> need to repartition(5) to get the task size down to an acceptable size
>> for my cluster. If I don't care that it's totally balanced, then I'd hope
>> that I could save a lot of overhead with
>>
>> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
>> 'randkey','host').apply(udf)
>>
>>
>> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
>> wrote:
>>
>>> Well if we think of shuffling as a necessity to perform an operation,
>>> then the problem would be that you are adding a ln aggregation stage to a
>>> job that is going to get shuffled anyway.  Like if you need to join two
>>> datasets, then Spark will still shuffle the data, whether they are grouped
>>> by hostname prior to that or not.  My question is, is there anything else
>>> that you would expect to gain, except for enforcing maybe a dataset that is
>>> already bucketed? Like you could enforce that data is where it is supposed
>>> to be, but what else would you avoid?
>>>
>>> Sent from my iPhone
>>>
>>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <
>>> pmccar...@dstillery.com.INVALID> wrote:
>>> >
>>> > When debugging some behavior on my YARN cluster I wrote the following
>>> PySpark UDF to figure out what host was operating on what row of data:
>>> >
>>> > @F.udf(T.StringType())
>>> > def add_hostname(x):
>>> >
>>> > import socket
>>> >
>>> > return str(socket.gethostname())
>>> >
>>> > It occurred to me that I could use this to enforce node-locality for
>>> other operations:
>>> >
>>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>>> >
>>> > When working on a big job without obvious partition keys, this seems
>>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>>> >
>>> > What problems would I introduce by trying to partition on hostname
>>> like this?
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: [External Sender] Pitfalls of partitioning by host?

2018-08-28 Thread Jayesh Lalwani
If you group by the host that you have computed using the UDF, Spark is
always going to shuffle your dataset, even if the end result is that all
the new partitions look exactly like the old partitions, just placed on
differrent nodes. Remember the hostname will probably hash differrently
than the partition key of the data.

Let's say, you are trying to do is read a file, apply a UDF, and write out
to file. Without your "performance improvement", Spark will read partitions
, apply the UDF to the rows in the partitions, and write the rows out..
With your upgrade, it will read the partitions, apply the hostname udf,
shuffle by host name, apply the UDF on the shuffled rows, and write the
data out

If your intent is to increase efficiency, this will do the opposite of what
you are trying to do

On Mon, Aug 27, 2018 at 1:23 PM Patrick McCarthy
 wrote:

> When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
> import socket
>
> return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other
> operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like
> a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like
> this?
>


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Sonal Goyal
Hi Patrick,

Sorry is there something here that helps you beyond repartition(number of
partitons) or calling your udf on foreachPartition? If your data is on
disk, Spark is already partitioning it for you by rows. How is adding the
host info helping?

Thanks,
Sonal
Nube Technologies <http://www.nubetech.co>

<http://in.linkedin.com/in/sonalgoyal>



On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
pmccar...@dstillery.com.invalid> wrote:

> Mostly I'm guessing that it adds efficiency to a job where partitioning is
> required but shuffling is not.
>
> For example, if I want to apply a UDF to 1tb of records on disk, I might
> need to repartition(5) to get the task size down to an acceptable size
> for my cluster. If I don't care that it's totally balanced, then I'd hope
> that I could save a lot of overhead with
>
> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
> 'randkey','host').apply(udf)
>
>
> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
> wrote:
>
>> Well if we think of shuffling as a necessity to perform an operation,
>> then the problem would be that you are adding a ln aggregation stage to a
>> job that is going to get shuffled anyway.  Like if you need to join two
>> datasets, then Spark will still shuffle the data, whether they are grouped
>> by hostname prior to that or not.  My question is, is there anything else
>> that you would expect to gain, except for enforcing maybe a dataset that is
>> already bucketed? Like you could enforce that data is where it is supposed
>> to be, but what else would you avoid?
>>
>> Sent from my iPhone
>>
>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy > .INVALID> wrote:
>> >
>> > When debugging some behavior on my YARN cluster I wrote the following
>> PySpark UDF to figure out what host was operating on what row of data:
>> >
>> > @F.udf(T.StringType())
>> > def add_hostname(x):
>> >
>> > import socket
>> >
>> > return str(socket.gethostname())
>> >
>> > It occurred to me that I could use this to enforce node-locality for
>> other operations:
>> >
>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>> >
>> > When working on a big job without obvious partition keys, this seems
>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>> >
>> > What problems would I introduce by trying to partition on hostname like
>> this?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Tue, Aug 28, 2018 at 10:28 AM, Patrick McCarthy 
wrote:

> Mostly I'm guessing that it adds efficiency to a job where partitioning is
> required but shuffling is not.
>
> For example, if I want to apply a UDF to 1tb of records on disk, I might
> need to repartition(5) to get the task size down to an acceptable size
> for my cluster. If I don't care that it's totally balanced, then I'd hope
> that I could save a lot of overhead with
>
> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
> 'randkey','host').apply(udf)
>
> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
> wrote:
>
>> Well if we think of shuffling as a necessity to perform an operation,
>> then the problem would be that you are adding a ln aggregation stage to a
>> job that is going to get shuffled anyway.  Like if you need to join two
>> datasets, then Spark will still shuffle the data, whether they are grouped
>> by hostname prior to that or not.  My question is, is there anything else
>> that you would expect to gain, except for enforcing maybe a dataset that is
>> already bucketed? Like you could enforce that data is where it is supposed
>> to be, but what else would you avoid?
>>
>> Sent from my iPhone
>>
>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy > .INVALID> wrote:
>> >
>> > When debugging some behavior on my YARN cluster I wrote the following
>> PySpark UDF to figure out what host was operating on what row of data:
>> >
>> > @F.udf(T.StringType())
>> > def add_hostname(x):
>> >
>> > import socket
>> >
>> > return str(socket.gethostname())
>> >
>> > It occurred to me that I could use this to enforce node-locality for
>> other operations:
>> >
>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>> >
>> > When working on a big job without obvious partition keys, this seems
>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>> >
>> > What problems would I introduce by trying to partition on hostname like
>> this?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)

On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-27 Thread Michael Artz
Well if we think of shuffling as a necessity to perform an operation, then the 
problem would be that you are adding a ln aggregation stage to a job that is 
going to get shuffled anyway.  Like if you need to join two datasets, then 
Spark will still shuffle the data, whether they are grouped by hostname prior 
to that or not.  My question is, is there anything else that you would expect 
to gain, except for enforcing maybe a dataset that is already bucketed? Like 
you could enforce that data is where it is supposed to be, but what else would 
you avoid? 

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
>  wrote:
> 
> When debugging some behavior on my YARN cluster I wrote the following PySpark 
> UDF to figure out what host was operating on what row of data:
> 
> @F.udf(T.StringType())
> def add_hostname(x):
> 
> import socket
> 
> return str(socket.gethostname())
> 
> It occurred to me that I could use this to enforce node-locality for other 
> operations:
> 
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> 
> When working on a big job without obvious partition keys, this seems like a 
> very straightforward way to avoid a shuffle, but it seems too easy. 
> 
> What problems would I introduce by trying to partition on hostname like this?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Pitfalls of partitioning by host?

2018-08-27 Thread Patrick McCarthy
When debugging some behavior on my YARN cluster I wrote the following
PySpark UDF to figure out what host was operating on what row of data:

@F.udf(T.StringType())
def add_hostname(x):

import socket

return str(socket.gethostname())

It occurred to me that I could use this to enforce node-locality for other
operations:

df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)

When working on a big job without obvious partition keys, this seems like a
very straightforward way to avoid a shuffle, but it seems too easy.

What problems would I introduce by trying to partition on hostname like
this?


Dynamic partitioning weird behavior

2018-08-07 Thread Nikolay Skovpin
Hi guys.
I was investigating a spark property
/spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")/. It
works perfectly in local fs, but on s3 i stumbled into a strange behavior.
If i don't have a hive table or this table is empty, spark won't save any
data into this table with SaveMode.Overwrite.
What i did:
import org.apache.spark.sql.{SaveMode, SparkSession}

  val spark = SparkSession.builder()
  .appName("Test for dynamic partitioning")
  .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
  .getOrCreate()
  
 val users = Seq(
 ("11", "Nikolay", "1900", "1"),
 ("12", "Nikolay", "1900", "1"),
 ("13", "Sergey", "1901", "1"),
 ("14", "Jone", "1900", "2"))
 .toDF("user_id", "name","year", "month")

users.write.partitionBy("year",
"month").mode(SaveMode.Overwrite).option("path",
"s3://dynamicPartitioning/users").saveAsTable("test.users")

I can see from logs that spark populates .spark-staging directory with the
data, then spark executes rename command.
But AlterTableRecoverPartitionsCommand shows me a message: /Found 0
partitions, Finished to gather the fast stats for all 0 partitions/. After
that the directory on s3 is empty (except _Sussess flag).
It is ok or a bug?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Overwrite only specific partition with hive dynamic partitioning

2018-08-01 Thread Nirav Patel
Hi,

I have a hive partition table created using sparkSession. I would like to
insert/overwrite Dataframe data to specific set of partition without
loosing any other partition. In each run I have to update Set of partitions
not just one.

e.g. I have dataframe with bid=1, bid=2, bid=3 in first time and I can
write it  by using

`df.write.mode(SaveMode.Overwrite).partitionBy("bid").parquet(TableBase
Location)`


It generates dirs: bid=1, bid=2, bid=3  inside TableBaseLocation

But next time when I have a dataframe with  bid=1, bid=4 and use same code
above it removes bid=2 and bid=3. in other words I dont get idempotency.

I tried SaveMode.append but that creates duplicate data inside "bid=1"


I read
https://issues.apache.org/jira/browse/SPARK-18183

With that approach it seems like I may have to updated multiple partition
manually for each input partition. That seems like lot of work on every
update. Is there a better way for this?

Can this fix be apply to dataframe based approach as well?

Thanks

-- 


 

 
   
   
      



[SPARK-SQL] Reading JSON column as a DataFrame and keeping partitioning information

2018-07-20 Thread Daniel Mateus Pires
I've been trying to figure out this one for some time now, I have JSONs 
representing Products coming (physically) partitioned by Brand and I would like 
to create a DataFrame from the JSON but also keep the partitioning information 
(Brand)

```
case class Product(brand: String, value: String)
val df = spark.createDataFrame(Seq(Product("something", """{"a": "b", "c": 
"d"}""")))
df.write.partitionBy("brand").mode("overwrite").json("/tmp/products5/")
val df2 = spark.read.json("/tmp/products5/")

df2.show
/*
++--+
|   value|brand|
++--+
|{"a": "b", "c": "d"}|  something|
++--+
*/


// This is simple and effective but it gets rid of the brand!
spark.read.json(df2.select("value").as[String]).show
/*
+---+---+
|  a|  c|
+---+---+
|  b|  d|
+---+---+
*/
```

Ideally I'd like something similar to spark.read.json that would keep the 
partitioning values and merge it with the rest of the DataFrame

End result I would like:
```
/*
+---+---+---+
|  a|  c| brand|
+---+---+---+
|  b|  d| something|
+---+---+---+
*/
```

Best regards,
Daniel Mateus Pires
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Regarding column partitioning IDs and names as per hierarchical level SparkSQL

2017-11-03 Thread ayan guha
you can use 10 passes over the same dataset and build the data


On Fri, Nov 3, 2017 at 9:48 PM, Jean Georges Perrin <jper...@lumeris.com>
wrote:

> Write a UDF?
>
> On Oct 31, 2017, at 11:48, Aakash Basu <aakash.spark@gmail.com> wrote:
>
> Hey all,
>
> Any help in the below please?
>
> Thanks,
> Aakash.
>
>
> -- Forwarded message --
> From: Aakash Basu <aakash.spark@gmail.com>
> Date: Tue, Oct 31, 2017 at 9:17 PM
> Subject: Regarding column partitioning IDs and names as per hierarchical
> level SparkSQL
> To: user <user@spark.apache.org>
>
>
> Hi all,
>
> I have to generate a table with Spark-SQL with the following columns -
>
>
> Level One Id: VARCHAR(20) NULL
> Level One Name: VARCHAR( 50) NOT NULL
> Level Two Id: VARCHAR( 20) NULL
> Level Two Name: VARCHAR(50) NULL
> Level Thr ee Id: VARCHAR(20) NULL
> Level Thr ee Name: VARCHAR(50) NULL
> Level Four Id: VARCHAR(20) NULL
> Level Four Name: VARCHAR( 50) NULL
> Level Five Id: VARCHAR(20) NULL
> Level Five Name: VARCHAR(50) NULL
> Level Six Id: VARCHAR(20) NULL
> Level Six Name: VARCHAR(50) NULL
> Level Seven Id: VARCHAR( 20) NULL
> Level Seven Name: VARCHAR(50) NULL
> Level Eight Id: VARCHAR( 20) NULL
> Level Eight Name: VARCHAR(50) NULL
> Level Nine Id: VARCHAR(20) NULL
> Level Nine Name: VARCHAR( 50) NULL
> Level Ten Id: VARCHAR(20) NULL
> Level Ten Name: VARCHAR(50) NULL
>
> My input source has these columns -
>
>
> ID Description ParentID
> 10 Great-Grandfather
> 1010 Grandfather 10
> 101010 1. Father A 1010
> 101011 2. Father B 1010
> 101012 4. Father C 1010
> 101013 5. Father D 1010
> 101015 3. Father E 1010
> 101018 Father F 1010
> 101019 6. Father G 1010
> 101020 Father H 1010
> 101021 Father I 1010
> 101022 2A. Father J 1010
> 10101010 2. Father K 101010
> Like the above, I have ID till 20 digits, which means, I have 10 levels.
>
> I want to populate the ID and name itself along with all the parents till
> the root for any particular level, which I am unable to create a concrete
> logic for.
>
> Am using this way to fetch respecting levels and populate them in the
> respective columns but not their parents -
>
> Present Logic ->
>
> FinalJoin_DF = spark.sql("select "
>   + "case when length(a.id)/2 = '1' then a.id
> else ' ' end as level_one_id, "
>   + "case when length(a.id)/2 = '1' then a.desc else ' ' end as
> level_one_name, "
>   + "case when length(a.id)/2 = '2' then a.id else ' ' end as
> level_two_id, "
>   + "case when length(a.id)/2 = '2' then a.desc else ' ' end as
> level_two_name, "
>   + "case when length(a.id)/2 = '3' then a.id
> else ' ' end as level_three_id, "
>   + "case when length(a.id)/2 = '3' then a.desc
> else ' ' end as level_three_name, "
>   + "case when length(a.id)/2 = '4' then a.id
> else ' ' end as level_four_id, "
>   + "case when length(a.id)/2 = '4' then a.desc
> else ' ' end as level_four_name, "
>   + "case when length(a.id)/2 = '5' then a.id
> else ' ' end as level_five_id, "
>   + "case when length(a.id)/2 = '5' then a.desc
> else ' ' end as level_five_name, "
>   + "case when length(a.id)/2 = '6' then a.id
> else ' ' end as level_six_id, "
>   + "case when length(a.id)/2 = '6' then a.desc else ' ' end as
> level_six_name, "
>   + "case when length(a.id)/2 = '7' then a.id else ' ' end as
> level_seven_id, "
>   + "case when length(a.id)/2 = '7' then a.desc
> else ' ' end as level_seven_name, "
>   + "case when length(a.id)/2 = '8' then a.id
> else ' ' end as level_eight_id, "
>   + "case when length(a.id)/2 = '8' then a.desc else ' ' end as
> level_eight_name, "
>   + "case when length(a.id)/2 = '9' then a.id
> else ' ' end as level_nine_id, "
>   + "case when length(a.id)/2 = '9' then a.desc else ' ' end as
> level_nine_name, "
>   + "case when length(a.id)/2 = '10' then a.id else ' ' end as
> level_ten_id, "
>   + "case when length(a.id)/2 = '10' then a.desc
> else ' ' end as level_ten_name "
>   + "from CategoryTempTable a")
>
>
> Can someone help me in also populating all the parents levels in the
> respective level ID and level name, please?
>
>
> Thanks,
> Aakash.
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Regarding column partitioning IDs and names as per hierarchical level SparkSQL

2017-11-03 Thread Jean Georges Perrin
Write a UDF?

> On Oct 31, 2017, at 11:48, Aakash Basu <aakash.spark@gmail.com 
> <mailto:aakash.spark@gmail.com>> wrote:
> 
> Hey all,
> 
> Any help in the below please?
> 
> Thanks,
> Aakash.
> 
> 
> -- Forwarded message --
> From: Aakash Basu <aakash.spark@gmail.com 
> <mailto:aakash.spark@gmail.com>>
> Date: Tue, Oct 31, 2017 at 9:17 PM
> Subject: Regarding column partitioning IDs and names as per hierarchical 
> level SparkSQL
> To: user <user@spark.apache.org <mailto:user@spark.apache.org>>
> 
> 
> Hi all,
> 
> I have to generate a table with Spark-SQL with the following columns -
> 
> 
> Level One Id: VARCHAR(20) NULL
> Level One Name: VARCHAR( 50) NOT NULL
> Level Two Id: VARCHAR( 20) NULL
> Level Two Name: VARCHAR(50) NULL
> Level Thr ee Id: VARCHAR(20) NULL
> Level Thr ee Name: VARCHAR(50) NULL
> Level Four Id: VARCHAR(20) NULL
> Level Four Name: VARCHAR( 50) NULL
> Level Five Id: VARCHAR(20) NULL
> Level Five Name: VARCHAR(50) NULL
> Level Six Id: VARCHAR(20) NULL
> Level Six Name: VARCHAR(50) NULL
> Level Seven Id: VARCHAR( 20) NULL
> Level Seven Name: VARCHAR(50) NULL
> Level Eight Id: VARCHAR( 20) NULL
> Level Eight Name: VARCHAR(50) NULL
> Level Nine Id: VARCHAR(20) NULL
> Level Nine Name: VARCHAR( 50) NULL
> Level Ten Id: VARCHAR(20) NULL
> Level Ten Name: VARCHAR(50) NULL
> 
> My input source has these columns -
> 
> 
> IDDescription ParentID
> 10Great-Grandfather
> 1010  Grandfather 10
> 1010101. Father A 1010
> 1010112. Father B 1010
> 1010124. Father C 1010
> 1010135. Father D 1010
> 1010153. Father E 1010
> 101018Father F1010
> 1010196. Father G 1010
> 101020Father H1010
> 101021Father I1010
> 1010222A. Father J1010
> 10101010  2. Father K 101010
> 
> Like the above, I have ID till 20 digits, which means, I have 10 levels.
> 
> I want to populate the ID and name itself along with all the parents till the 
> root for any particular level, which I am unable to create a concrete logic 
> for.
> 
> Am using this way to fetch respecting levels and populate them in the 
> respective columns but not their parents -
> 
> Present Logic ->
> 
> FinalJoin_DF = spark.sql("select "
>   + "case when length(a.id <http://a.id/>)/2 = '1' 
> then a.id <http://a.id/> else ' ' end as level_one_id, "
> + "case when length(a.id <http://a.id/>)/2 = '1' then 
> a.desc else ' ' end as level_one_name, "
> + "case when length(a.id <http://a.id/>)/2 = '2' then 
> a.id <http://a.id/> else ' ' end as level_two_id, "
> + "case when length(a.id <http://a.id/>)/2 = '2' then 
> a.desc else ' ' end as level_two_name, "
>   + "case when length(a.id <http://a.id/>)/2 = '3' 
> then a.id <http://a.id/> else ' ' end as level_three_id, "
>   + "case when length(a.id <http://a.id/>)/2 = '3' 
> then a.desc else ' ' end as level_three_name, "
>   + "case when length(a.id <http://a.id/>)/2 = '4' 
> then a.id <http://a.id/> else ' ' end as level_four_id, "
>   + "case when length(a.id <http://a.id/>)/2 = '4' 
> then a.desc else ' ' end as level_four_name, "
>   + "case when length(a.id <http://a.id/>)/2 = '5' 
> then a.id <http://a.id/> else ' ' end as level_five_id, "
>   + "case when length(a.id <http://a.id/>)/2 = '5' 
> then a.desc else ' ' end as level_five_name, "
>   + "case when length(a.id <http://a.id/>)/2 = '6' 
> then a.id <http://a.id/> else ' ' end as level_six_id, "
> + "case when length(a.id <http://a.id/>)/2 = '6' then 
> a.desc else ' ' end as level_six_name, "
> + "case when length(a.id <http://a.id/>)/2 = '7' then 
> a.id <http://a.id/> else ' ' end as level_seven_id, "
>   + "case when length(a.id <http://a.id/>)/2 = '7' 
> then a.desc else ' ' end as level_seven_name, "
>   + "case when length(a.id <http://a.id/>)/2 = '8' 
> then a.id <http://a.id/> else ' ' end as level_eight_id, "
> 

Fwd: Regarding column partitioning IDs and names as per hierarchical level SparkSQL

2017-10-31 Thread Aakash Basu
Hey all,

Any help in the below please?

Thanks,
Aakash.


-- Forwarded message --
From: Aakash Basu <aakash.spark@gmail.com>
Date: Tue, Oct 31, 2017 at 9:17 PM
Subject: Regarding column partitioning IDs and names as per hierarchical
level SparkSQL
To: user <user@spark.apache.org>


Hi all,

I have to generate a table with Spark-SQL with the following columns -


Level One Id: VARCHAR(20) NULL
Level One Name: VARCHAR( 50) NOT NULL
Level Two Id: VARCHAR( 20) NULL
Level Two Name: VARCHAR(50) NULL
Level Thr ee Id: VARCHAR(20) NULL
Level Thr ee Name: VARCHAR(50) NULL
Level Four Id: VARCHAR(20) NULL
Level Four Name: VARCHAR( 50) NULL
Level Five Id: VARCHAR(20) NULL
Level Five Name: VARCHAR(50) NULL
Level Six Id: VARCHAR(20) NULL
Level Six Name: VARCHAR(50) NULL
Level Seven Id: VARCHAR( 20) NULL
Level Seven Name: VARCHAR(50) NULL
Level Eight Id: VARCHAR( 20) NULL
Level Eight Name: VARCHAR(50) NULL
Level Nine Id: VARCHAR(20) NULL
Level Nine Name: VARCHAR( 50) NULL
Level Ten Id: VARCHAR(20) NULL
Level Ten Name: VARCHAR(50) NULL

My input source has these columns -


ID Description ParentID
10 Great-Grandfather
1010 Grandfather 10
101010 1. Father A 1010
101011 2. Father B 1010
101012 4. Father C 1010
101013 5. Father D 1010
101015 3. Father E 1010
101018 Father F 1010
101019 6. Father G 1010
101020 Father H 1010
101021 Father I 1010
101022 2A. Father J 1010
10101010 2. Father K 101010
Like the above, I have ID till 20 digits, which means, I have 10 levels.

I want to populate the ID and name itself along with all the parents till
the root for any particular level, which I am unable to create a concrete
logic for.

Am using this way to fetch respecting levels and populate them in the
respective columns but not their parents -

Present Logic ->

FinalJoin_DF = spark.sql("select "
  + "case when length(a.id)/2 = '1' then a.id else
' ' end as level_one_id, "
  + "case when length(a.id)/2 = '1' then a.desc else ' ' end as
level_one_name, "
  + "case when length(a.id)/2 = '2' then a.id else ' ' end as level_two_id,
"
  + "case when length(a.id)/2 = '2' then a.desc else ' ' end as
level_two_name, "
  + "case when length(a.id)/2 = '3' then a.id else
' ' end as level_three_id, "
  + "case when length(a.id)/2 = '3' then a.desc
else ' ' end as level_three_name, "
  + "case when length(a.id)/2 = '4' then a.id else
' ' end as level_four_id, "
  + "case when length(a.id)/2 = '4' then a.desc
else ' ' end as level_four_name, "
  + "case when length(a.id)/2 = '5' then a.id else
' ' end as level_five_id, "
  + "case when length(a.id)/2 = '5' then a.desc
else ' ' end as level_five_name, "
  + "case when length(a.id)/2 = '6' then a.id else
' ' end as level_six_id, "
  + "case when length(a.id)/2 = '6' then a.desc else ' ' end as
level_six_name, "
  + "case when length(a.id)/2 = '7' then a.id else ' ' end as
level_seven_id, "
  + "case when length(a.id)/2 = '7' then a.desc
else ' ' end as level_seven_name, "
  + "case when length(a.id)/2 = '8' then a.id else
' ' end as level_eight_id, "
  + "case when length(a.id)/2 = '8' then a.desc else ' ' end as
level_eight_name, "
  + "case when length(a.id)/2 = '9' then a.id else
' ' end as level_nine_id, "
  + "case when length(a.id)/2 = '9' then a.desc else ' ' end as
level_nine_name, "
  + "case when length(a.id)/2 = '10' then a.id else ' ' end as
level_ten_id, "
  + "case when length(a.id)/2 = '10' then a.desc
else ' ' end as level_ten_name "
  + "from CategoryTempTable a")


Can someone help me in also populating all the parents levels in the
respective level ID and level name, please?


Thanks,
Aakash.


Regarding column partitioning IDs and names as per hierarchical level SparkSQL

2017-10-31 Thread Aakash Basu
Hi all,

I have to generate a table with Spark-SQL with the following columns -


Level One Id: VARCHAR(20) NULL
Level One Name: VARCHAR( 50) NOT NULL
Level Two Id: VARCHAR( 20) NULL
Level Two Name: VARCHAR(50) NULL
Level Thr ee Id: VARCHAR(20) NULL
Level Thr ee Name: VARCHAR(50) NULL
Level Four Id: VARCHAR(20) NULL
Level Four Name: VARCHAR( 50) NULL
Level Five Id: VARCHAR(20) NULL
Level Five Name: VARCHAR(50) NULL
Level Six Id: VARCHAR(20) NULL
Level Six Name: VARCHAR(50) NULL
Level Seven Id: VARCHAR( 20) NULL
Level Seven Name: VARCHAR(50) NULL
Level Eight Id: VARCHAR( 20) NULL
Level Eight Name: VARCHAR(50) NULL
Level Nine Id: VARCHAR(20) NULL
Level Nine Name: VARCHAR( 50) NULL
Level Ten Id: VARCHAR(20) NULL
Level Ten Name: VARCHAR(50) NULL

My input source has these columns -


ID Description ParentID
10 Great-Grandfather
1010 Grandfather 10
101010 1. Father A 1010
101011 2. Father B 1010
101012 4. Father C 1010
101013 5. Father D 1010
101015 3. Father E 1010
101018 Father F 1010
101019 6. Father G 1010
101020 Father H 1010
101021 Father I 1010
101022 2A. Father J 1010
10101010 2. Father K 101010
Like the above, I have ID till 20 digits, which means, I have 10 levels.

I want to populate the ID and name itself along with all the parents till
the root for any particular level, which I am unable to create a concrete
logic for.

Am using this way to fetch respecting levels and populate them in the
respective columns but not their parents -

Present Logic ->

FinalJoin_DF = spark.sql("select "
  + "case when length(a.id)/2 = '1' then a.id else
' ' end as level_one_id, "
  + "case when length(a.id)/2 = '1' then a.desc else ' ' end as
level_one_name, "
  + "case when length(a.id)/2 = '2' then a.id else ' ' end as level_two_id,
"
  + "case when length(a.id)/2 = '2' then a.desc else ' ' end as
level_two_name, "
  + "case when length(a.id)/2 = '3' then a.id else
' ' end as level_three_id, "
  + "case when length(a.id)/2 = '3' then a.desc
else ' ' end as level_three_name, "
  + "case when length(a.id)/2 = '4' then a.id else
' ' end as level_four_id, "
  + "case when length(a.id)/2 = '4' then a.desc
else ' ' end as level_four_name, "
  + "case when length(a.id)/2 = '5' then a.id else
' ' end as level_five_id, "
  + "case when length(a.id)/2 = '5' then a.desc
else ' ' end as level_five_name, "
  + "case when length(a.id)/2 = '6' then a.id else
' ' end as level_six_id, "
  + "case when length(a.id)/2 = '6' then a.desc else ' ' end as
level_six_name, "
  + "case when length(a.id)/2 = '7' then a.id else ' ' end as
level_seven_id, "
  + "case when length(a.id)/2 = '7' then a.desc
else ' ' end as level_seven_name, "
  + "case when length(a.id)/2 = '8' then a.id else
' ' end as level_eight_id, "
  + "case when length(a.id)/2 = '8' then a.desc else ' ' end as
level_eight_name, "
  + "case when length(a.id)/2 = '9' then a.id else
' ' end as level_nine_id, "
  + "case when length(a.id)/2 = '9' then a.desc else ' ' end as
level_nine_name, "
  + "case when length(a.id)/2 = '10' then a.id else ' ' end as
level_ten_id, "
  + "case when length(a.id)/2 = '10' then a.desc
else ' ' end as level_ten_name "
  + "from CategoryTempTable a")


Can someone help me in also populating all the parents levels in the
respective level ID and level name, please?


Thanks,
Aakash.


Design aspects of Data partitioning for Window functions

2017-08-30 Thread Vasu Gourabathina
All:

If this question was already discussed, please let me know. I can try to
look into the archive.

Data Characteristics:
entity_id  date  fact_1 fact_2  fact_N   derived_1  derived_2  derived_X

a) There are 1000s of such entities in the system
b) Each one has various Fact attributes per each day (to begin with). In
future, we wanted to support multiple entries per day
c) Goal is to calculate various Derived attributes...some of them are
Windows functions, such as Average, Moving Average etc
d) The total number of rows per each entity might not be equally
distributed

Question:
1) What's the best way to partition the data for better performance
optimization? Any things to consider given point #d above?

Sample code:
The following code seems to work fine on a smaller sample size:
  window =
Window.partitionBy('entity_id').orderBy('date').rowsBetween(-30, 0)
  moving_avg = mean(df['fact_1']).over(window)
  moving_avg
  df2 = df.withColumn('derived_moving_avg', moving_avg)

Please advise if there are any aspects that need to be considered to make
it efficient to run on a larger data size (with N-node spark cluster).

Thanks in advance,
Vasu.


Informing Spark about specific Partitioning scheme to avoid shuffles

2017-07-22 Thread saatvikshah1994
Hi everyone,

My environment is PySpark with Spark 2.0.0. 

I'm using spark to load data from a large number of files into a Spark
dataframe with fields say field1 to field10. While loading my data I have
ensured that records are partitioned by field1 and field2(without using
partitionBy). This was done when loading the data into a RDD of lists and
before the .toDF() call. So I assume Spark would not already know that such
a partitioning exists and might trigger a shuffle if I call a shuffling
transform using field1 or field2 as keys and then cache that information. 

Is it possible to inform Spark once I've created the data-frame about my
custom partitioning scheme? Or would spark have already discovered this
somehow before the shuffling transform call? 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Informing-Spark-about-specific-Partitioning-scheme-to-avoid-shuffles-tp28922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How does partitioning happen for binary files in spark ?

2017-04-06 Thread Jay
The code that you see in github is for version 2.1. For versions below that
the default partitions for binary files is set to 2 which you can change by
using the minPartitions value. I am not sure starting 2.1 how the
minPartitions column will work because as you said the field is completely
ignored.

Thanks,
Jayadeep

On Thu, Apr 6, 2017 at 3:43 PM, ashwini anand <aayan...@gmail.com> wrote:

> By looking into the source code, I found that for textFile(), the
> partitioning is computed by the computeSplitSize() function in
> FileInputFormat class. This function takes into consideration the
> minPartitions value passed by user. As per my understanding , the same
> thing for binaryFiles() is computed by the setMinPartitions() function of
> PortableDataStream class. This setMinPartitions() function completely
> ignores the minPartitions value passed by user. However I find that in my
> application somehow the partition varies based on the minPartition value in
> case of binaryFiles() too. I have no idea how this is happening. Please
> help me understand how the partitioning happens in case of binaryFiles().
>
> source code for setMinPartitions() is as below: def setMinPartitions(sc:
> SparkContext, context: JobContext, minPartitions: Int) { val
> defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val
> defaultParallelism = sc.defaultParallelism val files =
> listStatus(context).asScala val totalBytes = 
> files.filterNot(_.isDirectory).map(_.getLen
> + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes,
> bytesPerCore)) super.setMaxSplitSize(maxSplitSize) }
> --
> View this message in context: How does partitioning happen for binary
> files in spark ?
> <http://apache-spark-user-list.1001560.n3.nabble.com/How-does-partitioning-happen-for-binary-files-in-spark-tp28575.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


Re: How does partitioning happen for binary files in spark ?

2017-04-06 Thread Jay
The code that you see in github is for version 2.1. For versions below that
the default partitions for binary files is set to 2 which you can change by
using the minPartitions value. I am not sure starting 2.1 how the
minPartitions column will work because as you said the field is completely
ignored.

Thanks,
Jayadeep

On Thu, Apr 6, 2017 at 3:43 PM, ashwini anand <aayan...@gmail.com> wrote:

> By looking into the source code, I found that for textFile(), the
> partitioning is computed by the computeSplitSize() function in
> FileInputFormat class. This function takes into consideration the
> minPartitions value passed by user. As per my understanding , the same
> thing for binaryFiles() is computed by the setMinPartitions() function of
> PortableDataStream class. This setMinPartitions() function completely
> ignores the minPartitions value passed by user. However I find that in my
> application somehow the partition varies based on the minPartition value in
> case of binaryFiles() too. I have no idea how this is happening. Please
> help me understand how the partitioning happens in case of binaryFiles().
>
> source code for setMinPartitions() is as below: def setMinPartitions(sc:
> SparkContext, context: JobContext, minPartitions: Int) { val
> defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val
> defaultParallelism = sc.defaultParallelism val files =
> listStatus(context).asScala val totalBytes = 
> files.filterNot(_.isDirectory).map(_.getLen
> + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes,
> bytesPerCore)) super.setMaxSplitSize(maxSplitSize) }
> --
> View this message in context: How does partitioning happen for binary
> files in spark ?
> <http://apache-spark-user-list.1001560.n3.nabble.com/How-does-partitioning-happen-for-binary-files-in-spark-tp28575.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


How does partitioning happen for binary files in spark ?

2017-04-06 Thread ashwini anand
By looking into the source code, I found that for textFile(), the
partitioning is computed by the computeSplitSize() function in
FileInputFormat class. This function takes into consideration the
minPartitions value passed by user. As per my understanding , the same thing
for binaryFiles() is computed by the setMinPartitions() function of
PortableDataStream class. This setMinPartitions() function completely
ignores the minPartitions value passed by user. However I find that in my
application somehow the partition varies based on the minPartition value in
case of binaryFiles() too. I have no idea how this is happening.Please help
me understand how the partitioning happens in case of binaryFiles().

source code for setMinPartitions() is as below:def setMinPartitions(sc:
SparkContext, context: JobContext, minPartitions: Int) {val
defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)   
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)val
defaultParallelism = sc.defaultParallelismval files =
listStatus(context).asScalaval totalBytes =
files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sumval
bytesPerCore = totalBytes / defaultParallelismval maxSplitSize =
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))   
super.setMaxSplitSize(maxSplitSize)  } 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-partitioning-happen-for-binary-files-in-spark-tp28575.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Partitioning strategy

2017-04-02 Thread Jörn Franke
You can always repartition, but maybe for your use case different rdds with the 
same data, but different partition strategies could make sense. It may also 
make sense to choose an appropriate format on disc (orc, parquet). You have to 
choose based also on the users' non-functional requirements.

> On 2. Apr 2017, at 12:32, <jasbir.s...@accenture.com> 
> <jasbir.s...@accenture.com> wrote:
> 
> Hi,
>  
> I have RDD with 4 years’ data with suppose 20 partitions. On runtime, user 
> can decide to select few months or years of RDD. That means, based upon user 
> time selection RDD is being filtered and on filtered RDD further 
> transformations and actions are performed. And, as spark says, child RDD get 
> partitions from parent RDD.
>  
> Therefore, is there any way to decide partitioning strategy after filter 
> operations?
>  
> Regards,
> Jasbir Singh
> 
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy. 
> __
> 
> www.accenture.com


Partitioning strategy

2017-04-02 Thread jasbir.sing
Hi,

I have RDD with 4 years’ data with suppose 20 partitions. On runtime, user can 
decide to select few months or years of RDD. That means, based upon user time 
selection RDD is being filtered and on filtered RDD further transformations and 
actions are performed. And, as spark says, child RDD get partitions from parent 
RDD.

Therefore, is there any way to decide partitioning strategy after filter 
operations?

Regards,
Jasbir Singh



This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__

www.accenture.com


Partitioning in spark while reading from RDBMS via JDBC

2017-03-31 Thread Devender Yadav
Hi All,


I am running spark in cluster mode and reading data from RDBMS via JDBC.

As per spark 
docs<http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases>,
 these partitioning parameters describe how to partition the table when reading 
in parallel from multiple workers:

partitionColumn,
lowerBound,
upperBound,
numPartitions


These are optional parameters.

What would happen if I don't specify these:

  *   Only 1 worker read the whole data?
  *   If it still reads parallelly, how does it partition data?



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


RE: Huge partitioning job takes longer to close after all tasks finished

2017-03-09 Thread PSwain
Hi Swapnil,

  We are facing same issue , could you please let me know how did you find that 
partitions are getting merged ?

Thanks in advance !!

From: Swapnil Shinde [mailto:swapnilushi...@gmail.com]
Sent: Thursday, March 09, 2017 1:31 AM
To: cht liu <liucht...@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Huge partitioning job takes longer to close after all tasks 
finished

Thank you liu. Can you please explain what do you mean by enabling spark fault 
tolerant mechanism?
I observed that after all tasks finishes, spark is working on concatenating 
same partitions from all tasks on file system. eg,
task1 - partition1, partition2, partition3
task2 - partition1, partition2, partition3

Then after task1, task2 finishes, spark concatenates partition1 from task1, 
task2 to create partition1. This is taking longer if we have large number of 
files. I am not sure if there is a way to let spark not to concatenate 
partitions from each task.

Thanks
Swapnil


On Tue, Mar 7, 2017 at 10:47 PM, cht liu 
<liucht...@gmail.com<mailto:liucht...@gmail.com>> wrote:

Do you enable the spark fault tolerance mechanism, RDD run at the end of the 
job, will start a separate job, to the checkpoint data written to the file 
system before the persistence of high availability

2017-03-08 2:45 GMT+08:00 Swapnil Shinde 
<swapnilushi...@gmail.com<mailto:swapnilushi...@gmail.com>>:
Hello all
   I have a spark job that reads parquet data and partition it based on one of 
the columns. I made sure partitions equally distributed and not skewed. My code 
looks like this -

datasetA.write.partitonBy("column1").parquet(outputPath)

Execution plan -
[Inline image 1]

All tasks(~12,000) finishes in 30-35 mins but it takes another 40-45 mins to 
close application. I am not sure what spark is doing after all tasks are 
processes successfully.
I checked thread dump (using UI executor tab) on few executors but couldnt find 
anything major. Overall, few shuffle-client processes are "RUNNABLE" and few 
dispatched-* processes are "WAITING".

Please let me know what spark is doing at this stage(after all tasks finished) 
and any way I can optimize it.

Thanks
Swapnil





** IMPORTANT--PLEASE READ 
This electronic message, including its attachments, is CONFIDENTIAL and may 
contain PROPRIETARY or LEGALLY PRIVILEGED or PROTECTED information and is 
intended for the authorized recipient of the sender.
If you are not the intended recipient, you are hereby notified that any use, 
disclosure, copying, or distribution of this message or any of the information 
included in it is unauthorized and strictly prohibited.
If you have received this message in error, please immediately notify the 
sender by reply e-mail and permanently delete this message and its attachments, 
along with any copies thereof, from all locations received (e.g., computer, 
mobile device, etc.).
Thank you.



Re: Huge partitioning job takes longer to close after all tasks finished

2017-03-09 Thread Gourav Sengupta
Hi,

you are definitely not using SPARK 2.1 in the way it should be used.

Try using sessions, and follow their guidelines, this issue has been
specifically resolved as a part of Spark 2.1 release.


Regards,
Gourav

On Wed, Mar 8, 2017 at 8:00 PM, Swapnil Shinde 
wrote:

> Thank you liu. Can you please explain what do you mean by enabling spark
> fault tolerant mechanism?
> I observed that after all tasks finishes, spark is working on
> concatenating same partitions from all tasks on file system. eg,
> task1 - partition1, partition2, partition3
> task2 - partition1, partition2, partition3
>
> Then after task1, task2 finishes, spark concatenates partition1 from
> task1, task2 to create partition1. This is taking longer if we have large
> number of files. I am not sure if there is a way to let spark not to
> concatenate partitions from each task.
>
> Thanks
> Swapnil
>
>
> On Tue, Mar 7, 2017 at 10:47 PM, cht liu  wrote:
>
>> Do you enable the spark fault tolerance mechanism, RDD run at the end of
>> the job, will start a separate job, to the checkpoint data written to the
>> file system before the persistence of high availability
>>
>> 2017-03-08 2:45 GMT+08:00 Swapnil Shinde :
>>
>>> Hello all
>>>I have a spark job that reads parquet data and partition it based on
>>> one of the columns. I made sure partitions equally distributed and not
>>> skewed. My code looks like this -
>>>
>>> datasetA.write.partitonBy("column1").parquet(outputPath)
>>>
>>> Execution plan -
>>> [image: Inline image 1]
>>>
>>> All tasks(~12,000) finishes in 30-35 mins but it takes another 40-45
>>> mins to close application. I am not sure what spark is doing after all
>>> tasks are processes successfully.
>>> I checked thread dump (using UI executor tab) on few executors but
>>> couldnt find anything major. Overall, few shuffle-client processes are
>>> "RUNNABLE" and few dispatched-* processes are "WAITING".
>>>
>>> Please let me know what spark is doing at this stage(after all tasks
>>> finished) and any way I can optimize it.
>>>
>>> Thanks
>>> Swapnil
>>>
>>>
>>>
>>
>


Re: Huge partitioning job takes longer to close after all tasks finished

2017-03-08 Thread Swapnil Shinde
Thank you liu. Can you please explain what do you mean by enabling spark
fault tolerant mechanism?
I observed that after all tasks finishes, spark is working on concatenating
same partitions from all tasks on file system. eg,
task1 - partition1, partition2, partition3
task2 - partition1, partition2, partition3

Then after task1, task2 finishes, spark concatenates partition1 from task1,
task2 to create partition1. This is taking longer if we have large number
of files. I am not sure if there is a way to let spark not to concatenate
partitions from each task.

Thanks
Swapnil


On Tue, Mar 7, 2017 at 10:47 PM, cht liu  wrote:

> Do you enable the spark fault tolerance mechanism, RDD run at the end of
> the job, will start a separate job, to the checkpoint data written to the
> file system before the persistence of high availability
>
> 2017-03-08 2:45 GMT+08:00 Swapnil Shinde :
>
>> Hello all
>>I have a spark job that reads parquet data and partition it based on
>> one of the columns. I made sure partitions equally distributed and not
>> skewed. My code looks like this -
>>
>> datasetA.write.partitonBy("column1").parquet(outputPath)
>>
>> Execution plan -
>> [image: Inline image 1]
>>
>> All tasks(~12,000) finishes in 30-35 mins but it takes another 40-45 mins
>> to close application. I am not sure what spark is doing after all tasks are
>> processes successfully.
>> I checked thread dump (using UI executor tab) on few executors but
>> couldnt find anything major. Overall, few shuffle-client processes are
>> "RUNNABLE" and few dispatched-* processes are "WAITING".
>>
>> Please let me know what spark is doing at this stage(after all tasks
>> finished) and any way I can optimize it.
>>
>> Thanks
>> Swapnil
>>
>>
>>
>


Re: Huge partitioning job takes longer to close after all tasks finished

2017-03-07 Thread cht liu
Do you enable the spark fault tolerance mechanism, RDD run at the end of
the job, will start a separate job, to the checkpoint data written to the
file system before the persistence of high availability

2017-03-08 2:45 GMT+08:00 Swapnil Shinde :

> Hello all
>I have a spark job that reads parquet data and partition it based on
> one of the columns. I made sure partitions equally distributed and not
> skewed. My code looks like this -
>
> datasetA.write.partitonBy("column1").parquet(outputPath)
>
> Execution plan -
> [image: Inline image 1]
>
> All tasks(~12,000) finishes in 30-35 mins but it takes another 40-45 mins
> to close application. I am not sure what spark is doing after all tasks are
> processes successfully.
> I checked thread dump (using UI executor tab) on few executors but couldnt
> find anything major. Overall, few shuffle-client processes are "RUNNABLE"
> and few dispatched-* processes are "WAITING".
>
> Please let me know what spark is doing at this stage(after all tasks
> finished) and any way I can optimize it.
>
> Thanks
> Swapnil
>
>
>


Huge partitioning job takes longer to close after all tasks finished

2017-03-07 Thread Swapnil Shinde
Hello all
   I have a spark job that reads parquet data and partition it based on one
of the columns. I made sure partitions equally distributed and not skewed.
My code looks like this -

datasetA.write.partitonBy("column1").parquet(outputPath)

Execution plan -
[image: Inline image 1]

All tasks(~12,000) finishes in 30-35 mins but it takes another 40-45 mins
to close application. I am not sure what spark is doing after all tasks are
processes successfully.
I checked thread dump (using UI executor tab) on few executors but couldnt
find anything major. Overall, few shuffle-client processes are "RUNNABLE"
and few dispatched-* processes are "WAITING".

Please let me know what spark is doing at this stage(after all tasks
finished) and any way I can optimize it.

Thanks
Swapnil


Re: Kafka Streaming and partitioning

2017-02-26 Thread tonyye
Hi Dave,
I had the same question and was wondering if you had found a way to do the
join without causing a shuffle?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955p28425.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: forcing dataframe groupby partitioning

2017-01-29 Thread Mendelson, Assaf
Could you explain why this would work?
Assaf.

From: Haviv, Daniel [mailto:dha...@amazon.com]
Sent: Sunday, January 29, 2017 7:09 PM
To: Mendelson, Assaf
Cc: user@spark.apache.org
Subject: Re: forcing dataframe groupby partitioning

If there's no built in local groupBy, You could do something like that:
df.groupby(C1,C2).agg(...).flatmap(x=>x.groupBy(C1)).agg

Thank you.
Daniel

On 29 Jan 2017, at 18:33, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Hi,

Consider the following example:

df.groupby(C1,C2).agg(some agg).groupby(C1).agg(some more agg)

The default way spark would behave would be to shuffle according to a 
combination of C1 and C2 and then shuffle again by C1 only.
This behavior makes sense when one uses C2 to salt C1 for skewed data, however, 
when this is part of the logic the cost of doing two shuffles instead of one is 
not insignificant (even worse when multiple iterations are done).

Is there a way to force the first groupby to shuffle the data so that the 
second groupby would occur within the partition?

Thanks,
Assaf.


Re: forcing dataframe groupby partitioning

2017-01-29 Thread Haviv, Daniel
If there's no built in local groupBy, You could do something like that:
df.groupby(C1,C2).agg(...).flatmap(x=>x.groupBy(C1)).agg

Thank you.
Daniel

On 29 Jan 2017, at 18:33, Mendelson, Assaf 
> wrote:

Hi,

Consider the following example:

df.groupby(C1,C2).agg(some agg).groupby(C1).agg(some more agg)

The default way spark would behave would be to shuffle according to a 
combination of C1 and C2 and then shuffle again by C1 only.
This behavior makes sense when one uses C2 to salt C1 for skewed data, however, 
when this is part of the logic the cost of doing two shuffles instead of one is 
not insignificant (even worse when multiple iterations are done).

Is there a way to force the first groupby to shuffle the data so that the 
second groupby would occur within the partition?

Thanks,
Assaf.


forcing dataframe groupby partitioning

2017-01-29 Thread Mendelson, Assaf
Hi,

Consider the following example:

df.groupby(C1,C2).agg(some agg).groupby(C1).agg(some more agg)

The default way spark would behave would be to shuffle according to a 
combination of C1 and C2 and then shuffle again by C1 only.
This behavior makes sense when one uses C2 to salt C1 for skewed data, however, 
when this is part of the logic the cost of doing two shuffles instead of one is 
not insignificant (even worse when multiple iterations are done).

Is there a way to force the first groupby to shuffle the data so that the 
second groupby would occur within the partition?

Thanks,
Assaf.


Re: Spark Partitioning Strategy with Parquet

2016-12-30 Thread titli batali
Yeah, it works for me.

Thanks

On Fri, Nov 18, 2016 at 3:08 AM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> I think you can use map reduce paradigm here. Create a key  using user ID
> and date and record as a value. Then you can express your operation (do
> something) part as a function. If the function meets certain criteria such
> as associative and cumulative like, say Add or multiplication, you can use
> reducebykey, else you may use groupbykey.
>
> HTH
> On 18 Nov 2016 06:45, "titli batali" <titlibat...@gmail.com> wrote:
>
>>
>> That would help but again in a particular partitions i would need to a
>> iterate over the customers having first n letters of user id in that
>> partition. I want to get rid of nested iterations.
>>
>> Thanks
>>
>> On Thu, Nov 17, 2016 at 10:28 PM, Xiaomeng Wan <shawn...@gmail.com>
>> wrote:
>>
>>> You can partitioned on the first n letters of userid
>>>
>>> On 17 November 2016 at 08:25, titli batali <titlibat...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a use case, where we have 1000 csv files with a column user_Id,
>>>> having 8 million unique users. The data contains: userid,date,transaction,
>>>> where we run some queries.
>>>>
>>>> We have a case where we need to iterate for each transaction in a
>>>> particular date for each user. There is three nesting loops
>>>>
>>>> for(user){
>>>>   for(date){
>>>> for(transactions){
>>>>   //Do Something
>>>>   }
>>>>}
>>>> }
>>>>
>>>> i.e we do similar thing for every (date,transaction) tuple for a
>>>> particular user. In order to get away with loop structure and decrease the
>>>> processing time We are converting converting the csv files to parquet and
>>>> partioning it with userid, df.write.format("parquet").par
>>>> titionBy("useridcol").save("hdfs://path").
>>>>
>>>> So that while reading the parquet files, we read a particular user in a
>>>> particular partition and create a Cartesian product of (date X transaction)
>>>> and work on the tuple in each partition, to achieve the above level of
>>>> nesting. Partitioning on 8 million users is it a bad option. What could be
>>>> a better way to achieve this?
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>
>>>
>>


Parallel dynamic partitioning producing duplicated data

2016-11-30 Thread Mehdi Ben Haj Abbes
Hi Folks,



I have a spark job reading a csv file into a dataframe. I register that
dataframe as a tempTable then I’m writing that dataframe/tempTable to hive
external table (using parquet format for storage)

I’m using this kind of command :

hiveContext.sql(*"INSERT INTO TABLE t PARTITION(statPart='string_value',
dynPart) SELECT * FROM tempTable"*);



Through this integration, for each csv line I will get a parquet
line/record. So if I count the csv files lines total number it must equals
the count of the parquet dataset produced.



I launch in parallel 20 of these jobs (to take advantage of idle
resources). Sometimes I get parquet count randomly slightly bigger than csv
count (mainly the difference concern one dynamic partition and one csv file
that has been integrated) but if I launch these job sequentially one after
the other I never get the problem of the different count.



Does anyone have  any idea about the cause of this problem (different
count). For me it is obvious that the parallel execution is causing the
issue and strongly believe that it happens when moving data from
hive.exec.stagingdir.prefix dir  to the hive final table location on hdfs



Thanks in advance.


RE: CSV to parquet preserving partitioning

2016-11-23 Thread benoitdr
Best solution I've found so far (no shuffling and as many threads as input
dirs) :

Create an rdd of input dirs, with as many partitions as input dirs
Transform it to an rdd of input files (preserving the partitions by dirs)
Flat-map it with a custom csv parser
Convert rdd to dataframe
Write dataframe to parquet table partitioned by dirs

It requires to write his own parser. I could not find a solution to preserve
the partitioning using sc.textfile or the databricks csv parser.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28120.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: CSV to parquet preserving partitioning

2016-11-18 Thread benoitdr
This is more or less how I'm doing it now.
Problem is that it creates shuffling in the cluster because the input data
are not collocated according to the partition scheme.

If a reload the output parquet files as a new dataframe, then everything is
fine, but I'd like to avoid shuffling also during the ETL phase.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Partitioning Strategy with Parquet

2016-11-17 Thread ayan guha
Hi

I think you can use map reduce paradigm here. Create a key  using user ID
and date and record as a value. Then you can express your operation (do
something) part as a function. If the function meets certain criteria such
as associative and cumulative like, say Add or multiplication, you can use
reducebykey, else you may use groupbykey.

HTH
On 18 Nov 2016 06:45, "titli batali" <titlibat...@gmail.com> wrote:

>
> That would help but again in a particular partitions i would need to a
> iterate over the customers having first n letters of user id in that
> partition. I want to get rid of nested iterations.
>
> Thanks
>
> On Thu, Nov 17, 2016 at 10:28 PM, Xiaomeng Wan <shawn...@gmail.com> wrote:
>
>> You can partitioned on the first n letters of userid
>>
>> On 17 November 2016 at 08:25, titli batali <titlibat...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a use case, where we have 1000 csv files with a column user_Id,
>>> having 8 million unique users. The data contains: userid,date,transaction,
>>> where we run some queries.
>>>
>>> We have a case where we need to iterate for each transaction in a
>>> particular date for each user. There is three nesting loops
>>>
>>> for(user){
>>>   for(date){
>>> for(transactions){
>>>   //Do Something
>>>   }
>>>}
>>> }
>>>
>>> i.e we do similar thing for every (date,transaction) tuple for a
>>> particular user. In order to get away with loop structure and decrease the
>>> processing time We are converting converting the csv files to parquet and
>>> partioning it with userid, df.write.format("parquet").par
>>> titionBy("useridcol").save("hdfs://path").
>>>
>>> So that while reading the parquet files, we read a particular user in a
>>> particular partition and create a Cartesian product of (date X transaction)
>>> and work on the tuple in each partition, to achieve the above level of
>>> nesting. Partitioning on 8 million users is it a bad option. What could be
>>> a better way to achieve this?
>>>
>>> Thanks
>>>
>>>
>>>
>>
>>
>


Re: Spark Partitioning Strategy with Parquet

2016-11-17 Thread titli batali
That would help but again in a particular partitions i would need to a
iterate over the customers having first n letters of user id in that
partition. I want to get rid of nested iterations.

Thanks

On Thu, Nov 17, 2016 at 10:28 PM, Xiaomeng Wan <shawn...@gmail.com> wrote:

> You can partitioned on the first n letters of userid
>
> On 17 November 2016 at 08:25, titli batali <titlibat...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a use case, where we have 1000 csv files with a column user_Id,
>> having 8 million unique users. The data contains: userid,date,transaction,
>> where we run some queries.
>>
>> We have a case where we need to iterate for each transaction in a
>> particular date for each user. There is three nesting loops
>>
>> for(user){
>>   for(date){
>> for(transactions){
>>   //Do Something
>>   }
>>}
>> }
>>
>> i.e we do similar thing for every (date,transaction) tuple for a
>> particular user. In order to get away with loop structure and decrease the
>> processing time We are converting converting the csv files to parquet and
>> partioning it with userid, df.write.format("parquet").par
>> titionBy("useridcol").save("hdfs://path").
>>
>> So that while reading the parquet files, we read a particular user in a
>> particular partition and create a Cartesian product of (date X transaction)
>> and work on the tuple in each partition, to achieve the above level of
>> nesting. Partitioning on 8 million users is it a bad option. What could be
>> a better way to achieve this?
>>
>> Thanks
>>
>>
>>
>
>


Re: Spark Partitioning Strategy with Parquet

2016-11-17 Thread Xiaomeng Wan
You can partitioned on the first n letters of userid

On 17 November 2016 at 08:25, titli batali <titlibat...@gmail.com> wrote:

> Hi,
>
> I have a use case, where we have 1000 csv files with a column user_Id,
> having 8 million unique users. The data contains: userid,date,transaction,
> where we run some queries.
>
> We have a case where we need to iterate for each transaction in a
> particular date for each user. There is three nesting loops
>
> for(user){
>   for(date){
> for(transactions){
>   //Do Something
>   }
>}
> }
>
> i.e we do similar thing for every (date,transaction) tuple for a
> particular user. In order to get away with loop structure and decrease the
> processing time We are converting converting the csv files to parquet and
> partioning it with userid, df.write.format("parquet").
> partitionBy("useridcol").save("hdfs://path").
>
> So that while reading the parquet files, we read a particular user in a
> particular partition and create a Cartesian product of (date X transaction)
> and work on the tuple in each partition, to achieve the above level of
> nesting. Partitioning on 8 million users is it a bad option. What could be
> a better way to achieve this?
>
> Thanks
>
>
>


Fwd: Spark Partitioning Strategy with Parquet

2016-11-17 Thread titli batali
Hi,

I have a use case, where we have 1000 csv files with a column user_Id,
having 8 million unique users. The data contains: userid,date,transaction,
where we run some queries.

We have a case where we need to iterate for each transaction in a
particular date for each user. There is three nesting loops

for(user){
  for(date){
for(transactions){
  //Do Something
  }
   }
}

i.e we do similar thing for every (date,transaction) tuple for a particular
user. In order to get away with loop structure and decrease the processing
time We are converting converting the csv files to parquet and partioning
it with userid,
df.write.format("parquet").partitionBy("useridcol").save("hdfs://path").

So that while reading the parquet files, we read a particular user in a
particular partition and create a Cartesian product of (date X transaction)
and work on the tuple in each partition, to achieve the above level of
nesting. Partitioning on 8 million users is it a bad option. What could be
a better way to achieve this?

Thanks


RE: CSV to parquet preserving partitioning

2016-11-16 Thread neil90
All you need to do is load all the files into one dataframe at once. Then
save the dataframe using partitionBy -

df.write.format("parquet").partitionBy("directoryCol").save("hdfs://path")

Then if you look at the new folder it should look like how you want it I.E -
hdfs://path/dir=dir1/part-r-xxx.gz.parquet 
hdfs://path/dir=dir2/part-r-yyy.gz.parquet 
hdfs://path/dir=dir3/part-r-zzz.gz.parquet 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: CSV to parquet preserving partitioning

2016-11-16 Thread benoitdr
Yes, by parsing the file content, it's possible to recover in which directory 
they are.

From: neil90 [via Apache Spark User List] 
[mailto:ml-node+s1001560n28083...@n3.nabble.com]
Sent: mercredi 16 novembre 2016 17:41
To: Drooghaag, Benoit (Nokia - BE) <benoit.droogh...@nokia.com>
Subject: Re: CSV to parquet preserving partitioning

Is there anything in the files to let you know which directory they should be 
in?

If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28083.html
To unsubscribe from CSV to parquet preserving partitioning, click 
here<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=28078=YmVub2l0LmRyb29naGFhZ0Bub2tpYS5jb218MjgwNzh8LTE1NDA4OTg4OTg=>.
NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28084.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: CSV to parquet preserving partitioning

2016-11-16 Thread neil90
Is there anything in the files to let you know which directory they should be
in?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28083.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: CSV to parquet preserving partitioning

2016-11-16 Thread Drooghaag, Benoit (Nokia - BE)
Good point, thanks !

That does the job from the moment the datasets corresponding to each input 
directory contain a single partition.

Question now is how to achieve this without shuffling the data ?
I’m using the databricks csv reader on spark 1.6 and I don’t think there is a 
way to control the partitioning.
As I can see, it creates one partition per csv file, so the data from one input 
directory can be puzzled accross the nodes ...

From: Daniel Siegmann [mailto:dsiegm...@securityscorecard.io]
Sent: mardi 15 novembre 2016 18:57
To: Drooghaag, Benoit (Nokia - BE) <benoit.droogh...@nokia.com>
Cc: user <user@spark.apache.org>
Subject: Re: CSV to parquet preserving partitioning

Did you try unioning the datasets for each CSV into a single dataset? You may 
need to put the directory name into a column so you can partition by it.
On Tue, Nov 15, 2016 at 8:44 AM, benoitdr 
<benoit.droogh...@nokia.com<mailto:benoit.droogh...@nokia.com>> wrote:
Hello,

I'm trying to convert a bunch of csv files to parquet, with the interesting
case that the input csv files are already "partitioned" by directory.
All the input files have the same set of columns.
The input files structure looks like :

/path/dir1/file1.csv
/path/dir1/file2.csv
/path/dir2/file3.csv
/path/dir3/file4.csv
/path/dir3/file5.csv
/path/dir3/file6.csv

I'd like to read those files and write their data to a parquet table in
hdfs, preserving the partitioning (partitioned by input directory), and such
as there is a single output file per partition.
The output files strucutre should look like :

hdfs://path/dir=dir1/part-r-xxx.gz.parquet
hdfs://path/dir=dir2/part-r-yyy.gz.parquet
hdfs://path/dir=dir3/part-r-zzz.gz.parquet


The best solution I have found so far is to loop among the input
directories, loading the csv files in a dataframe and to write the dataframe
in the target partition.
But this not efficient since I want a single output file per partition, the
writing to hdfs is a single tasks that blocks the loop.
I wonder how to achieve this with a maximum of parallelism (and without
shuffling the data in the cluster).

Thanks !



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>



Re: CSV to parquet preserving partitioning

2016-11-15 Thread Daniel Siegmann
Did you try unioning the datasets for each CSV into a single dataset? You
may need to put the directory name into a column so you can partition by it.

On Tue, Nov 15, 2016 at 8:44 AM, benoitdr <benoit.droogh...@nokia.com>
wrote:

> Hello,
>
> I'm trying to convert a bunch of csv files to parquet, with the interesting
> case that the input csv files are already "partitioned" by directory.
> All the input files have the same set of columns.
> The input files structure looks like :
>
> /path/dir1/file1.csv
> /path/dir1/file2.csv
> /path/dir2/file3.csv
> /path/dir3/file4.csv
> /path/dir3/file5.csv
> /path/dir3/file6.csv
>
> I'd like to read those files and write their data to a parquet table in
> hdfs, preserving the partitioning (partitioned by input directory), and
> such
> as there is a single output file per partition.
> The output files strucutre should look like :
>
> hdfs://path/dir=dir1/part-r-xxx.gz.parquet
> hdfs://path/dir=dir2/part-r-yyy.gz.parquet
> hdfs://path/dir=dir3/part-r-zzz.gz.parquet
>
>
> The best solution I have found so far is to loop among the input
> directories, loading the csv files in a dataframe and to write the
> dataframe
> in the target partition.
> But this not efficient since I want a single output file per partition, the
> writing to hdfs is a single tasks that blocks the loop.
> I wonder how to achieve this with a maximum of parallelism (and without
> shuffling the data in the cluster).
>
> Thanks !
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


CSV to parquet preserving partitioning

2016-11-15 Thread benoitdr
Hello,

I'm trying to convert a bunch of csv files to parquet, with the interesting
case that the input csv files are already "partitioned" by directory.
All the input files have the same set of columns.
The input files structure looks like :

/path/dir1/file1.csv
/path/dir1/file2.csv
/path/dir2/file3.csv
/path/dir3/file4.csv
/path/dir3/file5.csv
/path/dir3/file6.csv

I'd like to read those files and write their data to a parquet table in
hdfs, preserving the partitioning (partitioned by input directory), and such
as there is a single output file per partition.
The output files strucutre should look like :

hdfs://path/dir=dir1/part-r-xxx.gz.parquet
hdfs://path/dir=dir2/part-r-yyy.gz.parquet
hdfs://path/dir=dir3/part-r-zzz.gz.parquet


The best solution I have found so far is to loop among the input
directories, loading the csv files in a dataframe and to write the dataframe
in the target partition.
But this not efficient since I want a single output file per partition, the
writing to hdfs is a single tasks that blocks the loop.
I wonder how to achieve this with a maximum of parallelism (and without
shuffling the data in the cluster).

Thanks !



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: MapWithState partitioning

2016-10-31 Thread Andrii Biletskyi
Thanks,

As I understand for Kafka case the way to do it is to define my
kafka.Partitioner that is used when data is produced to Kafka and just
reuse this partitioner as spark.Partitioner in mapWithState spec.

I think I'll stick with that.

Thanks,
Andrii

2016-10-31 16:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> You may know that those streams share the same keys, but Spark doesn't
> unless you tell it.
>
> mapWithState takes a StateSpec, which should allow you to specify a
> partitioner.
>
> On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi <andrb...@gmail.com>
> wrote:
> > Thanks for response,
> >
> > So as I understand there is no way to "tell" mapWithState leave the
> > partitioning schema as any other transformation would normally do.
> > Then I would like to clarify if there is a simple way to do a
> transformation
> > to a key-value stream and specify somehow the Partitioner that
> effectively
> > would result in the same partitioning schema as the original stream.
> > I.e.:
> >
> > stream.mapPartitions({ crs =>
> >   crs.map { cr =>
> > cr.key() -> cr.value()
> >   }
> > }) <--- specify somehow Partitioner here for the resulting rdd.
> >
> >
> > The reason I ask is that it simply looks strange to me that Spark will
> have
> > to shuffle each time my input stream and "state" stream during the
> > mapWithState operation when I now for sure that those two streams will
> > always share same keys and will not need access to others partitions.
> >
> > Thanks,
> > Andrii
> >
> >
> > 2016-10-31 15:45 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> >>
> >> If you call a transformation on an rdd using the same partitioner as
> that
> >> rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner,
> there's no
> >> consistent partitioning scheme that works for all kafka uses. You can
> wrap
> >> each kafkardd with an rdd that has a custom partitioner that you write
> to
> >> match your kafka partitioning scheme, and avoid a shuffle.
> >>
> >> The danger there is if you have any misbehaving producers, or translate
> >> the partitioning wrongly, you'll get bad results. It's safer just to
> >> shuffle.
> >>
> >>
> >> On Oct 31, 2016 04:31, "Andrii Biletskyi"
> >> <andrii.bilets...@yahoo.com.invalid> wrote:
> >>
> >> Hi all,
> >>
> >> I'm using Spark Streaming mapWithState operation to do a stateful
> >> operation on my Kafka stream (though I think similar arguments would
> apply
> >> for any source).
> >>
> >> Trying to understand a way to control mapWithState's partitioning
> schema.
> >>
> >> My transformations are simple:
> >>
> >> 1) create KafkaDStream
> >> 2) mapPartitions to get a key-value stream where `key` corresponds to
> >> Kafka message key
> >> 3) apply mapWithState operation on key-value stream, the state stream
> >> shares keys with the original stream, the resulting streams doesn't
> change
> >> keys either
> >>
> >> The problem is that, as I understand, mapWithState stream has a
> different
> >> partitioning schema and thus I see shuffles in Spark Web UI.
> >>
> >> From the mapWithState implementation I see that:
> >> mapwithState uses Partitioner if specified, otherwise partitions data
> with
> >> HashPartitioner(). The thing is that original
> >> KafkaDStream has a specific partitioning schema: Kafka partitions
> correspond
> >> Spark RDD partitions.
> >>
> >> Question: is there a way for mapWithState stream to inherit partitioning
> >> schema from the original stream (i.e. correspond to Kafka partitions).
> >>
> >> Thanks,
> >> Andrii
> >>
> >>
> >
>


Re: MapWithState partitioning

2016-10-31 Thread Cody Koeninger
You may know that those streams share the same keys, but Spark doesn't
unless you tell it.

mapWithState takes a StateSpec, which should allow you to specify a partitioner.

On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi <andrb...@gmail.com> wrote:
> Thanks for response,
>
> So as I understand there is no way to "tell" mapWithState leave the
> partitioning schema as any other transformation would normally do.
> Then I would like to clarify if there is a simple way to do a transformation
> to a key-value stream and specify somehow the Partitioner that effectively
> would result in the same partitioning schema as the original stream.
> I.e.:
>
> stream.mapPartitions({ crs =>
>   crs.map { cr =>
> cr.key() -> cr.value()
>   }
> }) <--- specify somehow Partitioner here for the resulting rdd.
>
>
> The reason I ask is that it simply looks strange to me that Spark will have
> to shuffle each time my input stream and "state" stream during the
> mapWithState operation when I now for sure that those two streams will
> always share same keys and will not need access to others partitions.
>
> Thanks,
> Andrii
>
>
> 2016-10-31 15:45 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>
>> If you call a transformation on an rdd using the same partitioner as that
>> rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner, there's no
>> consistent partitioning scheme that works for all kafka uses. You can wrap
>> each kafkardd with an rdd that has a custom partitioner that you write to
>> match your kafka partitioning scheme, and avoid a shuffle.
>>
>> The danger there is if you have any misbehaving producers, or translate
>> the partitioning wrongly, you'll get bad results. It's safer just to
>> shuffle.
>>
>>
>> On Oct 31, 2016 04:31, "Andrii Biletskyi"
>> <andrii.bilets...@yahoo.com.invalid> wrote:
>>
>> Hi all,
>>
>> I'm using Spark Streaming mapWithState operation to do a stateful
>> operation on my Kafka stream (though I think similar arguments would apply
>> for any source).
>>
>> Trying to understand a way to control mapWithState's partitioning schema.
>>
>> My transformations are simple:
>>
>> 1) create KafkaDStream
>> 2) mapPartitions to get a key-value stream where `key` corresponds to
>> Kafka message key
>> 3) apply mapWithState operation on key-value stream, the state stream
>> shares keys with the original stream, the resulting streams doesn't change
>> keys either
>>
>> The problem is that, as I understand, mapWithState stream has a different
>> partitioning schema and thus I see shuffles in Spark Web UI.
>>
>> From the mapWithState implementation I see that:
>> mapwithState uses Partitioner if specified, otherwise partitions data with
>> HashPartitioner(). The thing is that original
>> KafkaDStream has a specific partitioning schema: Kafka partitions correspond
>> Spark RDD partitions.
>>
>> Question: is there a way for mapWithState stream to inherit partitioning
>> schema from the original stream (i.e. correspond to Kafka partitions).
>>
>> Thanks,
>> Andrii
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: MapWithState partitioning

2016-10-31 Thread Cody Koeninger
If you call a transformation on an rdd using the same partitioner as that
rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner, there's
no consistent partitioning scheme that works for all kafka uses. You can
wrap each kafkardd with an rdd that has a custom partitioner that you write
to match your kafka partitioning scheme, and avoid a shuffle.

The danger there is if you have any misbehaving producers, or translate the
partitioning wrongly, you'll get bad results. It's safer just to shuffle.

On Oct 31, 2016 04:31, "Andrii Biletskyi"
<andrii.bilets...@yahoo.com.invalid> wrote:

Hi all,

I'm using Spark Streaming mapWithState operation to do a stateful operation
on my Kafka stream (though I think similar arguments would apply for any
source).

Trying to understand a way to control mapWithState's partitioning schema.

My transformations are simple:

1) create KafkaDStream
2) mapPartitions to get a key-value stream where `key` corresponds to Kafka
message key
3) apply mapWithState operation on key-value stream, the state stream
shares keys with the original stream, the resulting streams doesn't change
keys either

The problem is that, as I understand, mapWithState stream has a different
partitioning schema and thus I see shuffles in Spark Web UI.

>From the mapWithState implementation I see that:
mapwithState uses Partitioner if specified, otherwise partitions data with
HashPartitioner(). The thing is that original
KafkaDStream has a specific partitioning schema: Kafka partitions
correspond Spark RDD partitions.

Question: is there a way for mapWithState stream to inherit partitioning
schema from the original stream (i.e. correspond to Kafka partitions).

Thanks,
Andrii


MapWithState partitioning

2016-10-31 Thread Andrii Biletskyi
Hi all,

I'm using Spark Streaming mapWithState operation to do a stateful operation
on my Kafka stream (though I think similar arguments would apply for any
source).

Trying to understand a way to control mapWithState's partitioning schema.

My transformations are simple:

1) create KafkaDStream
2) mapPartitions to get a key-value stream where `key` corresponds to Kafka
message key
3) apply mapWithState operation on key-value stream, the state stream
shares keys with the original stream, the resulting streams doesn't change
keys either

The problem is that, as I understand, mapWithState stream has a different
partitioning schema and thus I see shuffles in Spark Web UI.

>From the mapWithState implementation I see that:
mapwithState uses Partitioner if specified, otherwise partitions data with
HashPartitioner(). The thing is that original
KafkaDStream has a specific partitioning schema: Kafka partitions
correspond Spark RDD partitions.

Question: is there a way for mapWithState stream to inherit partitioning
schema from the original stream (i.e. correspond to Kafka partitions).

Thanks,
Andrii


Re: Re-partitioning mapwithstateDstream

2016-10-13 Thread manasdebashiskar
StateSpec has a method numPartitions to set the initial number of partition.

That should do the trick.

...Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-partitioning-mapwithstateDstream-tp27880p27899.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Partitioning in spark

2016-06-24 Thread Darshan Singh
Thanks but the whole point is not setting it explicitly but it should be
derived from its parent RDDS.

Thanks

On Fri, Jun 24, 2016 at 6:09 AM, ayan guha  wrote:

> You can change paralllism like following:
>
> conf = SparkConf()
> conf.set('spark.sql.shuffle.partitions',10)
> sc = SparkContext(conf=conf)
>
>
>
> On Fri, Jun 24, 2016 at 6:46 AM, Darshan Singh 
> wrote:
>
>> Hi,
>>
>> My default parallelism is 100. Now I join 2 dataframes with 20 partitions
>> each , joined dataframe has 100 partition. I want to know what is the way
>> to keep it to 20 (except re-partition and coalesce.
>>
>> Also, when i join these 2 dataframes I am using 4 columns as joined
>> columns. The dataframes are partitions based on first 2 columns of join and
>> thus, in effect one partition should be joined corresponding joins and
>> doesn't need to join with rest of partitions so why spark is shuffling all
>> the data.
>>
>> Simialrly, when my dataframe is partitioned by col1,col2 and if i use
>> group by on col1,col2,col3,col4 then why does it shuffle everything whereas
>> it need to sort each partitions and then should grouping there itself.
>>
>> Bit confusing , I am using 1.5.1
>>
>> Is it fixed in future versions.
>>
>> Thanks
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Partitioning in spark

2016-06-23 Thread ayan guha
You can change paralllism like following:

conf = SparkConf()
conf.set('spark.sql.shuffle.partitions',10)
sc = SparkContext(conf=conf)



On Fri, Jun 24, 2016 at 6:46 AM, Darshan Singh 
wrote:

> Hi,
>
> My default parallelism is 100. Now I join 2 dataframes with 20 partitions
> each , joined dataframe has 100 partition. I want to know what is the way
> to keep it to 20 (except re-partition and coalesce.
>
> Also, when i join these 2 dataframes I am using 4 columns as joined
> columns. The dataframes are partitions based on first 2 columns of join and
> thus, in effect one partition should be joined corresponding joins and
> doesn't need to join with rest of partitions so why spark is shuffling all
> the data.
>
> Simialrly, when my dataframe is partitioned by col1,col2 and if i use
> group by on col1,col2,col3,col4 then why does it shuffle everything whereas
> it need to sort each partitions and then should grouping there itself.
>
> Bit confusing , I am using 1.5.1
>
> Is it fixed in future versions.
>
> Thanks
>



-- 
Best Regards,
Ayan Guha


Partitioning in spark

2016-06-23 Thread Darshan Singh
Hi,

My default parallelism is 100. Now I join 2 dataframes with 20 partitions
each , joined dataframe has 100 partition. I want to know what is the way
to keep it to 20 (except re-partition and coalesce.

Also, when i join these 2 dataframes I am using 4 columns as joined
columns. The dataframes are partitions based on first 2 columns of join and
thus, in effect one partition should be joined corresponding joins and
doesn't need to join with rest of partitions so why spark is shuffling all
the data.

Simialrly, when my dataframe is partitioned by col1,col2 and if i use group
by on col1,col2,col3,col4 then why does it shuffle everything whereas it
need to sort each partitions and then should grouping there itself.

Bit confusing , I am using 1.5.1

Is it fixed in future versions.

Thanks


Re: Custom positioning/partitioning Dataframes

2016-06-03 Thread Takeshi Yamamuro
Hi,

I'm afraid spark has no explicit api to set custom partitioners in df for
now.

// maropu

On Sat, Jun 4, 2016 at 1:09 AM, Nilesh Chakraborty <nil...@nileshc.com>
wrote:

> Hi,
>
> I have a domain-specific schema (RDF data with vertical partitioning, ie.
> one table per property) and I want to instruct SparkSQL to keep
> semantically
> closer property tables closer together, that is, group dataframes together
> into different nodes (or at least encourage it somehow) so that tables that
> are most frequently joined together are located locally together.
>
> Any thoughts on how I can do this with Spark? Any internal hack ideas are
> welcome too. :)
>
> Cheers,
> Nilesh
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Custom-positioning-partitioning-Dataframes-tp27084.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Strategies for propery load-balanced partitioning

2016-06-03 Thread Takeshi Yamamuro
Hi,

you can control this kinda issue in the comming v2.0.
See https://www.mail-archive.com/user@spark.apache.org/msg51603.html

// maropu


On Sat, Jun 4, 2016 at 10:23 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Hi Saif!
>
>
>
> When you say this happens with spark-csv, are the files gzipped by any
> chance? GZip is non-splittable so if you’re seeing skew simply from loading
> data it could be you have some extremely large gzip files. So for a single
> stage job you will have those tasks lagging compared to the smaller gzips.
> As you already said, the option there would be to repartition at the
> expense of shuffling. If you’re seeing this with parquet files, what do the
> individual part-* files look like (size, compression type, etc.)?
>
>
>
> Thanks,
>
> Silvio
>
>
>
> *From: *"saif.a.ell...@wellsfargo.com" <saif.a.ell...@wellsfargo.com>
> *Date: *Friday, June 3, 2016 at 8:31 AM
> *To: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *Strategies for propery load-balanced partitioning
>
>
>
> Hello everyone!
>
>
>
> I was noticing that, when reading parquet files or actually any kind of
> source data frame data (spark-csv, etc), default partinioning is not fair.
>
> Action tasks usually act very fast on some partitions and very slow on
> some others, and frequently, even fast on all but last partition (which
> looks like it reads +50% of the data input size).
>
>
>
> I notice that each task is loading some portion of the data, say 1024MB
> chunks, and some task loading 20+GB of data.
>
>
>
> Applying repartition strategies solve this issue properly and general
> performance is increased considerably, but for very large dataframes,
> repartitioning is a costly process.
>
>
>
> In short, what are the available strategies or configurations that help
> reading from disk or hdfs with proper executor-data-distribution??
>
>
>
> If this needs to be more specific, I am strictly focused on PARQUET files
> rom HDFS. I know there are some MIN
>
>
>
> Really appreciate,
>
> Saif
>
>
>



-- 
---
Takeshi Yamamuro


Re: Strategies for propery load-balanced partitioning

2016-06-03 Thread Silvio Fiorito
Hi Saif!

When you say this happens with spark-csv, are the files gzipped by any chance? 
GZip is non-splittable so if you’re seeing skew simply from loading data it 
could be you have some extremely large gzip files. So for a single stage job 
you will have those tasks lagging compared to the smaller gzips. As you already 
said, the option there would be to repartition at the expense of shuffling. If 
you’re seeing this with parquet files, what do the individual part-* files look 
like (size, compression type, etc.)?

Thanks,
Silvio

From: "saif.a.ell...@wellsfargo.com" <saif.a.ell...@wellsfargo.com>
Date: Friday, June 3, 2016 at 8:31 AM
To: "user@spark.apache.org" <user@spark.apache.org>
Subject: Strategies for propery load-balanced partitioning

Hello everyone!

I was noticing that, when reading parquet files or actually any kind of source 
data frame data (spark-csv, etc), default partinioning is not fair.
Action tasks usually act very fast on some partitions and very slow on some 
others, and frequently, even fast on all but last partition (which looks like 
it reads +50% of the data input size).

I notice that each task is loading some portion of the data, say 1024MB chunks, 
and some task loading 20+GB of data.

Applying repartition strategies solve this issue properly and general 
performance is increased considerably, but for very large dataframes, 
repartitioning is a costly process.

In short, what are the available strategies or configurations that help reading 
from disk or hdfs with proper executor-data-distribution??

If this needs to be more specific, I am strictly focused on PARQUET files rom 
HDFS. I know there are some MIN

Really appreciate,
Saif



RE: Strategies for propery load-balanced partitioning

2016-06-03 Thread Saif.A.Ellafi
Appreciate the follow up.

I am not entirely sure how or why my question is related to bucketization 
capabilities. It indeeds sounds like a powerful feature to avoid shuffling, but 
in my case, I am referring to straight forward processes of reading data and 
writing to parquet.
If bucket tables allow to setup on pre-reading time buckets and specify 
parallelization when directly writing, then you hit on the nail.

My problem is that reading from source (usually hundreds of text files) turn in 
into 10k+ partition dataframes, based on the partition's block size and  number 
of data splits, writing these back are a huge overhead for parquet and require 
repartitioning in order to reduce heap memory usage, specially on wide tables.

Let see how it goes.
Saif


From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr]
Sent: Friday, June 03, 2016 2:55 PM
To: Ellafi, Saif A.
Cc: user; Reynold Xin; mich...@databricks.com
Subject: Re: Strategies for propery load-balanced partitioning

I suppose you are running on 1.6.
I guess you need some solution based on [1], [2] features which are coming in 
2.0.

[1] https://issues.apache.org/jira/browse/SPARK-12538 / 
https://issues.apache.org/jira/browse/SPARK-12394
[2] https://issues.apache.org/jira/browse/SPARK-12849

However, I did not check for examples, I would like to add to your question and 
ask the community to link to some examples with the recent improvements/changes.

It could help however to give concrete example on your specific problem, as you 
may hit some stragglers also probably caused by data skew.

Best,
Ovidiu


On 03 Jun 2016, at 17:31, 
saif.a.ell...@wellsfargo.com<mailto:saif.a.ell...@wellsfargo.com> wrote:

Hello everyone!

I was noticing that, when reading parquet files or actually any kind of source 
data frame data (spark-csv, etc), default partinioning is not fair.
Action tasks usually act very fast on some partitions and very slow on some 
others, and frequently, even fast on all but last partition (which looks like 
it reads +50% of the data input size).

I notice that each task is loading some portion of the data, say 1024MB chunks, 
and some task loading 20+GB of data.

Applying repartition strategies solve this issue properly and general 
performance is increased considerably, but for very large dataframes, 
repartitioning is a costly process.

In short, what are the available strategies or configurations that help reading 
from disk or hdfs with proper executor-data-distribution??

If this needs to be more specific, I am strictly focused on PARQUET files rom 
HDFS. I know there are some MIN

Really appreciate,
Saif



Re: Strategies for propery load-balanced partitioning

2016-06-03 Thread Ovidiu-Cristian MARCU
I suppose you are running on 1.6.
I guess you need some solution based on [1], [2] features which are coming in 
2.0.

[1] https://issues.apache.org/jira/browse/SPARK-12538 
 / 
https://issues.apache.org/jira/browse/SPARK-12394 

[2] https://issues.apache.org/jira/browse/SPARK-12849 


However, I did not check for examples, I would like to add to your question and 
ask the community to link to some examples with the recent improvements/changes.

It could help however to give concrete example on your specific problem, as you 
may hit some stragglers also probably caused by data skew.

Best,
Ovidiu


> On 03 Jun 2016, at 17:31, saif.a.ell...@wellsfargo.com wrote:
> 
> Hello everyone!
>  
> I was noticing that, when reading parquet files or actually any kind of 
> source data frame data (spark-csv, etc), default partinioning is not fair.
> Action tasks usually act very fast on some partitions and very slow on some 
> others, and frequently, even fast on all but last partition (which looks like 
> it reads +50% of the data input size).
>  
> I notice that each task is loading some portion of the data, say 1024MB 
> chunks, and some task loading 20+GB of data.
>  
> Applying repartition strategies solve this issue properly and general 
> performance is increased considerably, but for very large dataframes, 
> repartitioning is a costly process.
>  
> In short, what are the available strategies or configurations that help 
> reading from disk or hdfs with proper executor-data-distribution??
>  
> If this needs to be more specific, I am strictly focused on PARQUET files rom 
> HDFS. I know there are some MIN
>  
> Really appreciate,
> Saif



Custom positioning/partitioning Dataframes

2016-06-03 Thread Nilesh Chakraborty
Hi,

I have a domain-specific schema (RDF data with vertical partitioning, ie.
one table per property) and I want to instruct SparkSQL to keep semantically
closer property tables closer together, that is, group dataframes together
into different nodes (or at least encourage it somehow) so that tables that
are most frequently joined together are located locally together.

Any thoughts on how I can do this with Spark? Any internal hack ideas are
welcome too. :)

Cheers,
Nilesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-positioning-partitioning-Dataframes-tp27084.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Strategies for propery load-balanced partitioning

2016-06-03 Thread Saif.A.Ellafi
Hello everyone!

I was noticing that, when reading parquet files or actually any kind of source 
data frame data (spark-csv, etc), default partinioning is not fair.
Action tasks usually act very fast on some partitions and very slow on some 
others, and frequently, even fast on all but last partition (which looks like 
it reads +50% of the data input size).

I notice that each task is loading some portion of the data, say 1024MB chunks, 
and some task loading 20+GB of data.

Applying repartition strategies solve this issue properly and general 
performance is increased considerably, but for very large dataframes, 
repartitioning is a costly process.

In short, what are the available strategies or configurations that help reading 
from disk or hdfs with proper executor-data-distribution??

If this needs to be more specific, I am strictly focused on PARQUET files rom 
HDFS. I know there are some MIN

Really appreciate,
Saif



Partitioning Data to optimize combineByKey

2016-06-02 Thread Nathan Case
Hello,

I am trying to process a dataset that is approximately 2 tb using a cluster
with 4.5 tb of ram.  The data is in parquet format and is initially loaded
into a dataframe.  A subset of the data is then queried for and converted
to RDD for more complicated processing.  The first stage of that processing
is to mapToPair to use each rows id as the key in a tuple.  Then the data
goes through a combineByKey operation to group all values with the same
key.  This operation always exceeds the maximum cluster memory and the job
eventually fails.  While it is shuffling there is a lot of "spilling
in-memory map to disk" messages.  I am wondering if I were to have the data
initially partitioned such that all the rows with the same id resided
within the same partition if it would need to do left shuffling and perform
correctly.

To do the initial load I am using:

sqlContext.read().parquet(inputPathArray).repartition(1, new
Column("id"));

I am not sure if this is the correct way to partition a dataframe so that
is my first question is the above correct.

My next question is that when I go from the dataframe to rdd using:

JavaRDD locationsForSpecificKey = sqlc.sql("SELECT * FROM
standardlocationrecords WHERE customerID = " + customerID + " AND
partnerAppID = " + partnerAppID)
.toJavaRDD().map(new LocationRecordFromRow()::apply);

is the partition scheme from the dataframe preserved or do I need to
repartition after doing a mapToPair using:

rdd.partitionBy and passing in a custom HashPartitioner that uses the hash
of the ID field.

My goal is to reduce the shuffling when doing the final combineByKey to
prevent the job from running out of memory and failing.  Any help would be
greatly appreciated.

Thanks,
Nathan


  1   2   3   >