> .dropDuplicates() \ .cache() |
> Since df_actions is cached, you can count inserts and updates quickly
> with only that one join in df_actions:

Hi Enrico. I am wondering if this is ok for very large tables ? Is
caching faster than recomputing both insert/update ?

Thanks

Enrico Minack <m...@enrico.minack.dev> writes:

> Ashley,
>
> I want to suggest a few optimizations. The problem might go away but
> at least performance should improve.
> The freeze problems could have many reasons, the Spark UI SQL pages
> and stages detail pages would be useful. You can send them privately,
> if you wish.
>
> 1. the repartition(1) should be replaced by coalesce(1). The former
> will shuffle all data, while the latter will read in the existing
> partitions and not shuffle them again.
> 2. Repartitioning to a single partition is discouraged, unless you can
> guarantee the data fit into one worker's memory.
> 3. You can compute Insert and Update in one go, so that you don't have
> to join with df_reference twice.
>
> |df_actions =
> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
> how="left") \ .withColumn('|||_action|',
> when(||||col('b.hashkey')||.isNull,
> 'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'),
> 'Update')) \| .select(col('_action'), *df_source_hashed) \
> .dropDuplicates() \ .cache() |
>
> Since df_actions is cached, you can count inserts and updates quickly
> with only that one join in df_actions:
>
> |inserts_count = df_actions|||.where(col('_action') === 
> 'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === 
> 'Update')|.count()|
>
> And you can get rid of the union:
>
> |df_output = df_actions.where(col('_action').isNotNull) |
>
> If you have to write that output to parquet anyway, then you can get
> the count quickly from the parquet file if it is partitioned by the
> _action column (Spark then only looks into parquet's metadata to get
> the count, it does not read any row):
>
> |df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
> df_output =
> |||sql_context.read.parquet('|||||/path/to/output.parquet|')
> |inserts_count = |||df_output|.where(col('_action') ===
> 'Insert').count() updates_count = |||df_output|.where(col('_action')
> === 'Update').count() |
>
> These are all just sketches, but I am sure you get the idea.
>
> Enrico
>
>
> Am 13.02.20 um 05:08 schrieb Ashley Hoff:
>> Hi,
>>
>> I am currently working on an app using PySpark to produce an insert
>> and update daily delta capture, being outputted as Parquet.  This is
>> running on a 8 core 32 GB Linux server in standalone mode (set to 6
>> worker cores of 2GB memory each) running Spark 2.4.3.
>>
>> This is being achieved by reading in data from a TSQL database, into
>> a dataframe, which has a hash of all records appended to it and
>> comparing it to a dataframe from yesterdays data (which has been
>> saved also as parquet).
>>
>> As part of the monitoring and logging, I am trying to count the
>> number of records for the respective actions.  Example code:
>> |df_source = spark_session.read.format('jdbc')..... df_reference =
>> sql_context.read.parquet('/path/to/reference.parquet')
>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('',
>> *df_source.columns))) \ .cache() df_inserts =
>> df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>> .select(lit('Insert').alias('_action'), *df_source_hashed) \
>> .dropDuplicates() \ .cache() inserts_count = df_inserts.count()
>> df_updates =
>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
>> how="inner") \ .select(lit('Update').alias('_action'),
>> *df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \
>> .dropDuplicates() \ .cache() updates_count = df_updates.count()
>> df_output = df_inserts.union(df_updates)
>> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')|
>> The above code is running two occurrences concurrently via Python
>> threading.Thread (this is to try and overcome the network bottle
>> neck connecting to the database server).
>>
>> What I am finding is I am getting some very inconsistent behavior
>> with the counts.  Occasionally, it appears that it will freeze up on
>> a count operation for a few minutes and quite often that specific
>> data frame will have zero records in it.  According to the DAG
>> (which I am not 100% sure how to read) the following is the
>> processing flow:
>>
>> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0
>>  => WholeStageCodegen/MapPartitionsRDD [75]count at
>> NativeMethodAccessorImpl.java:0  =>
>> InMemoryTableScan/MapPartitionsRDD [78]count at
>> NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at
>> NativeMethodAccessorImpl.java:0 =>
>> WholeStageCodegen/MapPartitionsRDD [80]count at
>> NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
>> [81]count at NativeMethodAccessorImpl.java:0
>>
>> The other observation I have found that if I remove the counts from
>> the data frame operations and instead open the outputted parquet
>> field and count using a
>> `sql_context.read.load('/path/to/output.parquet').filter(col("_action")
>> == "Insert").count()` command, I am reducing my run-times by around
>> 20 to 30%.  In my feeble mind, opening up the outputs and re-reading
>> them seems counter-intuitive.
>>
>> Is anyone able to give me some guidance on why or how to ensure that
>> I am doing the above as efficiently as possible?
>>
>> Best Regards
>> Ashley


--
nicolas paris

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to