Hi Ashley,

I'm not an expert but think this is because spark does lazy execution and
doesn't actually perform any actions until you do some kind of write, count
or other operation on the dataframe.

If you remove the count steps it will work out a more efficient execution
plan reducing the number of task steps.

if you can do the count as a final step I would do that. I think you may
also not need the .cache() statements and you might want to experiment
reducing the number spark.sql.shuffle.partitions too.

Thanks
Dave








On Thu, 13 Feb 2020, 04:09 Ashley Hoff, <ash.j.h...@gmail.com> wrote:

> Hi,
>
> I am currently working on an app using PySpark to produce an insert and
> update daily delta capture, being outputted as Parquet.  This is running on
> a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of
> 2GB memory each) running Spark 2.4.3.
>
> This is being achieved by reading in data from a TSQL database, into a
> dataframe, which has a hash of all records appended to it and comparing it
> to a dataframe from yesterdays data (which has been saved also as
> parquet).
>
> As part of the monitoring and logging, I am trying to count the number of
> records for the respective actions.  Example code:
>
> df_source = spark_session.read.format('jdbc').....
> df_reference = sql_context.read.parquet('/path/to/reference.parquet')
>
> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', 
> *df_source.columns))) \
>             .cache()
>
> df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>                     .select(lit('Insert').alias('_action'), 
> *df_source_hashed) \
>                     .dropDuplicates() \
>                     .cache()
> inserts_count = df_inserts.count()
>
> df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), 
> pk_list, how="inner") \
>                         .select(lit('Update').alias('_action'), 
> *df_source_hashed) \
>                         .where(col('a.hashkey') != col('b.hashkey')) \
>                         .dropDuplicates() \
>                         .cache()
> updates_count = df_updates.count()
>
> df_output = df_inserts.union(df_updates)
>
> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
>
> The above code is running two occurrences concurrently via Python
> threading.Thread (this is to try and overcome the network bottle neck
> connecting to the database server).
>
> What I am finding is I am getting some very inconsistent behavior with the
> counts.  Occasionally, it appears that it will freeze up on a count
> operation for a few minutes and quite often that specific data frame will
> have zero records in it.  According to the DAG (which I am not 100% sure
> how to read) the following is the processing flow:
>
> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  =>
> WholeStageCodegen/MapPartitionsRDD [75]count at
> NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD
> [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count
> at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD
> [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
> [81]count at NativeMethodAccessorImpl.java:0
>
> The other observation I have found that if I remove the counts from the
> data frame operations and instead open the outputted parquet field and
> count using a
> `sql_context.read.load('/path/to/output.parquet').filter(col("_action") ==
> "Insert").count()` command, I am reducing my run-times by around 20 to
> 30%.  In my feeble mind, opening up the outputs and re-reading them seems
> counter-intuitive.
>
> Is anyone able to give me some guidance on why or how to ensure that I am
> doing the above as efficiently as possible?
>
> Best Regards
> Ashley
>

Reply via email to