Hi ashley,

Apologies reading this on my phone as work l laptop doesn't let me access
personal email.

Are you actually doing anything with the counts (printing to log, writing
to table?)

If you're not doing anything with them get rid of them and the caches
entirely.

If you do want to do something with the counts you could try removing the
individual counts and caches.

Put a single cache on the df_output

df_output = df_inserts.union(df_updates).cache()

Then output a count group by type on this df before  writing out the parquet.

Hope that helps

Dave


On Thu, 13 Feb 2020, 06:09 Ashley Hoff, <ash.j.h...@gmail.com> wrote:

> Thanks David,
>
> I did experiment with the .cache() keyword and have to admit I didn't see
> any marked improvement on the sample that I was running, so yes I am a bit
> apprehensive including it (not even sure why I actually left it in).
>
> When you say "do the count as the final step", are you referring to
> getting the counts of the individual data frames, or from the already
> outputted parquet?
>
> Thanks and I appreciate your reply
>
> On Thu, Feb 13, 2020 at 4:15 PM David Edwards <edwardsdj...@googlemail.com>
> wrote:
>
>> Hi Ashley,
>>
>> I'm not an expert but think this is because spark does lazy execution and
>> doesn't actually perform any actions until you do some kind of write, count
>> or other operation on the dataframe.
>>
>> If you remove the count steps it will work out a more efficient execution
>> plan reducing the number of task steps.
>>
>> if you can do the count as a final step I would do that. I think you may
>> also not need the .cache() statements and you might want to experiment
>> reducing the number spark.sql.shuffle.partitions too.
>>
>> Thanks
>> Dave
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, 13 Feb 2020, 04:09 Ashley Hoff, <ash.j.h...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am currently working on an app using PySpark to produce an insert and
>>> update daily delta capture, being outputted as Parquet.  This is running on
>>> a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of
>>> 2GB memory each) running Spark 2.4.3.
>>>
>>> This is being achieved by reading in data from a TSQL database, into a
>>> dataframe, which has a hash of all records appended to it and comparing it
>>> to a dataframe from yesterdays data (which has been saved also as
>>> parquet).
>>>
>>> As part of the monitoring and logging, I am trying to count the number
>>> of records for the respective actions.  Example code:
>>>
>>> df_source = spark_session.read.format('jdbc').....
>>> df_reference = sql_context.read.parquet('/path/to/reference.parquet')
>>>
>>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', 
>>> *df_source.columns))) \
>>>             .cache()
>>>
>>> df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>>>                     .select(lit('Insert').alias('_action'), 
>>> *df_source_hashed) \
>>>                     .dropDuplicates() \
>>>                     .cache()
>>> inserts_count = df_inserts.count()
>>>
>>> df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), 
>>> pk_list, how="inner") \
>>>                         .select(lit('Update').alias('_action'), 
>>> *df_source_hashed) \
>>>                         .where(col('a.hashkey') != col('b.hashkey')) \
>>>                         .dropDuplicates() \
>>>                         .cache()
>>> updates_count = df_updates.count()
>>>
>>> df_output = df_inserts.union(df_updates)
>>>
>>> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
>>>
>>> The above code is running two occurrences concurrently via Python
>>> threading.Thread (this is to try and overcome the network bottle neck
>>> connecting to the database server).
>>>
>>> What I am finding is I am getting some very inconsistent behavior with
>>> the counts.  Occasionally, it appears that it will freeze up on a count
>>> operation for a few minutes and quite often that specific data frame will
>>> have zero records in it.  According to the DAG (which I am not 100% sure
>>> how to read) the following is the processing flow:
>>>
>>> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  =>
>>> WholeStageCodegen/MapPartitionsRDD [75]count at
>>> NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD
>>> [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count
>>> at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD
>>> [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
>>> [81]count at NativeMethodAccessorImpl.java:0
>>>
>>> The other observation I have found that if I remove the counts from the
>>> data frame operations and instead open the outputted parquet field and
>>> count using a
>>> `sql_context.read.load('/path/to/output.parquet').filter(col("_action") ==
>>> "Insert").count()` command, I am reducing my run-times by around 20 to
>>> 30%.  In my feeble mind, opening up the outputs and re-reading them seems
>>> counter-intuitive.
>>>
>>> Is anyone able to give me some guidance on why or how to ensure that I
>>> am doing the above as efficiently as possible?
>>>
>>> Best Regards
>>> Ashley
>>>
>>
>
> --
> Kustoms On Silver <https://www.facebook.com/KustomsOnSilver>
>

Reply via email to