Re: test

2020-07-27 Thread Ashley Hoff
Yes, your emails are getting through.

On Mon, Jul 27, 2020 at 6:31 PM Suat Toksöz  wrote:

> user@spark.apache.org
>
> --
>
> Best regards,
>
> *Suat Toksoz*
>


-- 
Kustoms On Silver 


Re: [Announcement] Cloud data lake conference with heavy focus on open source

2020-07-07 Thread Ashley Hoff
Interesting  You've piqued my interest.  Will the sessons be available
after the conference?  (I'm in the wrong timezone to see this during
daylight hours)

On Wed, Jul 8, 2020 at 2:40 AM ldazaa11  wrote:

> Hello Sparkers,
>
> If you’re interested in how Spark is being applied in cloud data lake
> environments, then you should check out a new 1-day LIVE, virtual
> conference
> on July 30. This conference is called Subsurface and the focus is technical
> talks tailored specifically for data architects and engineers building
> cloud
> data lakes and related technologies.
>
> Here are some of the speakers presenting at the event:
>
> Wes McKinney - Director at Ursa Labs, Pandas Creator and Apache Arrow
> co-creator.
> Maxime Beauchemin - CEO and Founder, Preset. Apache Superset and Airflow
> Creator.
> Julien Le Dem - Co-founder and CTO at Datakin. Apache Parquet Co-creator.
> Daniel Weeks - Big Data Compute Team Lead, Netflix - Parquet Committer and
> Hive Contributor.
>
>
> You can join  here
> <
> https://subsurfaceconf.com/summer2020?utm_medium=website_source=open-source_term=na_content=na_campaign=2020-subsurface-summer>
>
> (there’s no cost)
>
> The Subsurface Team
> @subsurfaceconf
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Kustoms On Silver 


Re: wot no toggle ?

2020-04-16 Thread Ashley Hoff
OK, we get it.  you are not satisfied that Spark is easy to be used by mere
mortals.

Please stop

  Maybe you should look at Data Bricks?

On Thu, Apr 16, 2020 at 3:43 PM jane thorpe 
wrote:

> https://spark.apache.org/docs/3.0.0-preview/web-ui.html#storage-tab
>
> On the link in one of the screen shot there are two  checkboxes.
> ON HEAP MEMORY
> OFF HEAP MEMORY.
>
> That is as useful as a pussy on as Barry Humphries wearing a gold dress
> as Dame Edna average.
>
> Which monkey came up with that ?
> None of the moneys here noticed that ?
>
> Ever heard of a toggle switch.
> look behind you.
> Look at the light switch.
> That is a Toggle switch ON/OFF.
>
>
> Jane thorpe
> janethor...@aol.com
>




Virus-free.
www.avast.com

<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>


Re: Questions about count() performance with dataframes and parquet files

2020-02-13 Thread Ashley Hoff
Hi,

Thank you both for your suggestions!  These have been eyeopeners for me.

Just to clarify, I need the counts for logging and auditing purposes
otherwise I would exclude the step.  I should have also mentioned that
while I am processing around 30 GB of raw data, the individual outputs are
relatively small - in total across all files around 30 MB.

Firstly, I have identified the issue and it has nothing to do with Spark.
As part of my testing regime, I do a large file copy operation to stage
data (copying around 6 GB from one directory to the next).  Once that is
done, I kick off my process.  Looking at Cockpit, I noticed that after the
copy command had completed, there was a good 60+ seconds of intensive disk
IO.  In previous testing, this IO was still occurring when the first stages
of my script was running.  I've now extensively re-tested after letting
this IO drop off and I am no longer getting these freezes.  In reality
there is any benefit in the real world (I.E., waiting 90 seconds to save 60
seconds), but at least I can explain why.

David, by far your suggestion of reducing the shuffle partitions has
absolutely smashed the run times out of the park.  I've reduced the
shuffles down to the configured number of workers (in this case 6) and I am
seeing another 20% of my run times.  I have now hit a firm bottleneck
around network and getting the data from the database server (there was not
much difference between 30 shuffle partitions compared to 6).

Enrico, I have tried your suggestions and I can see some wins as well.  I
have to re-design and rebuild some of my solution to get them to work.
When this project was started, I was asked to provide single partitioned
parquet files (in the same sort of way you would see being outputted by
Pandas) and so my solution has been built around that.  By partitioning on
a field means that I can't deliver in this way.  Regardless, reading in the
parquet at the end, even with a filter clause in the statement, appears to
be much quicker than reading from the data frames.  (I now need to try and
convince the other stakeholders in the project that delivering files how
Spark intended is the correct method)

Thank you both for your input.

Best regards
Ashley



On Fri, Feb 14, 2020 at 4:44 AM Enrico Minack 
wrote:

> Ashley,
>
> I want to suggest a few optimizations. The problem might go away but at
> least performance should improve.
> The freeze problems could have many reasons, the Spark UI SQL pages and
> stages detail pages would be useful. You can send them privately, if you
> wish.
>
> 1. the repartition(1) should be replaced by coalesce(1). The former will
> shuffle all data, while the latter will read in the existing partitions and
> not shuffle them again.
> 2. Repartitioning to a single partition is discouraged, unless you can
> guarantee the data fit into one worker's memory.
> 3. You can compute Insert and Update in one go, so that you don't have to
> join with df_reference twice.
>
> df_actions = df_source_hashed.alias('a').join(df_reference.alias('b'), 
> pk_list, how="left") \
>  .withColumn('_action', 
> when(col('b.hashkey').isNull, 'Insert').otherwise(col('a.hashkey') != 
> col('b.hashkey'), 'Update')) \
>  .select(col('_action'), *df_source_hashed) \
>  .dropDuplicates() \
>  .cache()
>
> Since df_actions is cached, you can count inserts and updates quickly
> with only that one join in df_actions:
>
> inserts_count = df_actions.where(col('_action') === 'Insert').count()
> updates_count = df_actions.where(col('_action') === 'Update').count()
>
> And you can get rid of the union:
>
> df_output = df_actions.where(col('_action').isNotNull)
>
> If you have to write that output to parquet anyway, then you can get the
> count quickly from the parquet file if it is partitioned by the _action
> column (Spark then only looks into parquet's metadata to get the count, it
> does not read any row):
>
> df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
> df_output = sql_context.read.parquet('/path/to/output.parquet')inserts_count 
> = df_output.where(col('_action') === 'Insert').count()
> updates_count = df_output.where(col('_action') === 'Update').count()
>
> These are all just sketches, but I am sure you get the idea.
>
> Enrico
>
>
> Am 13.02.20 um 05:08 schrieb Ashley Hoff:
>
> Hi,
>
> I am currently working on an app using PySpark to produce an insert and
> update daily delta capture, being outputted as Parquet.  This is running on
> a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of
> 2GB memory each) running Spark 2.4.3.
>
> This is being achieved by reading i

Re: Questions about count() performance with dataframes and parquet files

2020-02-12 Thread Ashley Hoff
Thanks David,

I did experiment with the .cache() keyword and have to admit I didn't see
any marked improvement on the sample that I was running, so yes I am a bit
apprehensive including it (not even sure why I actually left it in).

When you say "do the count as the final step", are you referring to getting
the counts of the individual data frames, or from the already outputted
parquet?

Thanks and I appreciate your reply

On Thu, Feb 13, 2020 at 4:15 PM David Edwards 
wrote:

> Hi Ashley,
>
> I'm not an expert but think this is because spark does lazy execution and
> doesn't actually perform any actions until you do some kind of write, count
> or other operation on the dataframe.
>
> If you remove the count steps it will work out a more efficient execution
> plan reducing the number of task steps.
>
> if you can do the count as a final step I would do that. I think you may
> also not need the .cache() statements and you might want to experiment
> reducing the number spark.sql.shuffle.partitions too.
>
> Thanks
> Dave
>
>
>
>
>
>
>
>
> On Thu, 13 Feb 2020, 04:09 Ashley Hoff,  wrote:
>
>> Hi,
>>
>> I am currently working on an app using PySpark to produce an insert and
>> update daily delta capture, being outputted as Parquet.  This is running on
>> a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of
>> 2GB memory each) running Spark 2.4.3.
>>
>> This is being achieved by reading in data from a TSQL database, into a
>> dataframe, which has a hash of all records appended to it and comparing it
>> to a dataframe from yesterdays data (which has been saved also as
>> parquet).
>>
>> As part of the monitoring and logging, I am trying to count the number of
>> records for the respective actions.  Example code:
>>
>> df_source = spark_session.read.format('jdbc').
>> df_reference = sql_context.read.parquet('/path/to/reference.parquet')
>>
>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', 
>> *df_source.columns))) \
>> .cache()
>>
>> df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>> .select(lit('Insert').alias('_action'), 
>> *df_source_hashed) \
>> .dropDuplicates() \
>> .cache()
>> inserts_count = df_inserts.count()
>>
>> df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), 
>> pk_list, how="inner") \
>> .select(lit('Update').alias('_action'), 
>> *df_source_hashed) \
>> .where(col('a.hashkey') != col('b.hashkey')) \
>> .dropDuplicates() \
>> .cache()
>> updates_count = df_updates.count()
>>
>> df_output = df_inserts.union(df_updates)
>>
>> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
>>
>> The above code is running two occurrences concurrently via Python
>> threading.Thread (this is to try and overcome the network bottle neck
>> connecting to the database server).
>>
>> What I am finding is I am getting some very inconsistent behavior with
>> the counts.  Occasionally, it appears that it will freeze up on a count
>> operation for a few minutes and quite often that specific data frame will
>> have zero records in it.  According to the DAG (which I am not 100% sure
>> how to read) the following is the processing flow:
>>
>> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  =>
>> WholeStageCodegen/MapPartitionsRDD [75]count at
>> NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD
>> [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count
>> at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD
>> [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
>> [81]count at NativeMethodAccessorImpl.java:0
>>
>> The other observation I have found that if I remove the counts from the
>> data frame operations and instead open the outputted parquet field and
>> count using a
>> `sql_context.read.load('/path/to/output.parquet').filter(col("_action") ==
>> "Insert").count()` command, I am reducing my run-times by around 20 to
>> 30%.  In my feeble mind, opening up the outputs and re-reading them seems
>> counter-intuitive.
>>
>> Is anyone able to give me some guidance on why or how to ensure that I am
>> doing the above as efficiently as possible?
>>
>> Best Regards
>> Ashley
>>
>

-- 
Kustoms On Silver <https://www.facebook.com/KustomsOnSilver>


Questions about count() performance with dataframes and parquet files

2020-02-12 Thread Ashley Hoff
Hi,

I am currently working on an app using PySpark to produce an insert and
update daily delta capture, being outputted as Parquet.  This is running on
a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of
2GB memory each) running Spark 2.4.3.

This is being achieved by reading in data from a TSQL database, into a
dataframe, which has a hash of all records appended to it and comparing it
to a dataframe from yesterdays data (which has been saved also as
parquet).

As part of the monitoring and logging, I am trying to count the number of
records for the respective actions.  Example code:

df_source = spark_session.read.format('jdbc').
df_reference = sql_context.read.parquet('/path/to/reference.parquet')

df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('',
*df_source.columns))) \
.cache()

df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
.select(lit('Insert').alias('_action'), *df_source_hashed) \
.dropDuplicates() \
.cache()
inserts_count = df_inserts.count()

df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'),
pk_list, how="inner") \
.select(lit('Update').alias('_action'),
*df_source_hashed) \
.where(col('a.hashkey') != col('b.hashkey')) \
.dropDuplicates() \
.cache()
updates_count = df_updates.count()

df_output = df_inserts.union(df_updates)

df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')

The above code is running two occurrences concurrently via Python
threading.Thread (this is to try and overcome the network bottle neck
connecting to the database server).

What I am finding is I am getting some very inconsistent behavior with the
counts.  Occasionally, it appears that it will freeze up on a count
operation for a few minutes and quite often that specific data frame will
have zero records in it.  According to the DAG (which I am not 100% sure
how to read) the following is the processing flow:

Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  =>
WholeStageCodegen/MapPartitionsRDD [75]count at
NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD
[78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count
at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD
[80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
[81]count at NativeMethodAccessorImpl.java:0

The other observation I have found that if I remove the counts from the
data frame operations and instead open the outputted parquet field and
count using a
`sql_context.read.load('/path/to/output.parquet').filter(col("_action") ==
"Insert").count()` command, I am reducing my run-times by around 20 to
30%.  In my feeble mind, opening up the outputs and re-reading them seems
counter-intuitive.

Is anyone able to give me some guidance on why or how to ensure that I am
doing the above as efficiently as possible?

Best Regards
Ashley