> either materialize the Dataframe on HDFS (e.g. parquet or checkpoint)

I wonder if avro is a better candidate for this because it's row
oriented it should be faster to write/read for such a task. Never heard
about checkpoint. 

Enrico Minack <m...@enrico.minack.dev> writes:

> It is not about very large or small, it is about how large your
> cluster is w.r.t. your data. Caching is only useful if you have the
> respective memory available across your executors. Otherwise you could
> either materialize the Dataframe on HDFS (e.g. parquet or checkpoint)
> or indeed have to do the join twice. It's a memory-over-CPU trade-off.
>
> Enrico
>
>
> Am 17.02.20 um 22:06 schrieb Nicolas PARIS:
>>> .dropDuplicates() \ .cache() |
>>> Since df_actions is cached, you can count inserts and updates quickly
>>> with only that one join in df_actions:
>> Hi Enrico. I am wondering if this is ok for very large tables ? Is
>> caching faster than recomputing both insert/update ?
>>
>> Thanks
>>
>> Enrico Minack <m...@enrico.minack.dev> writes:
>>
>>> Ashley,
>>>
>>> I want to suggest a few optimizations. The problem might go away but
>>> at least performance should improve.
>>> The freeze problems could have many reasons, the Spark UI SQL pages
>>> and stages detail pages would be useful. You can send them privately,
>>> if you wish.
>>>
>>> 1. the repartition(1) should be replaced by coalesce(1). The former
>>> will shuffle all data, while the latter will read in the existing
>>> partitions and not shuffle them again.
>>> 2. Repartitioning to a single partition is discouraged, unless you can
>>> guarantee the data fit into one worker's memory.
>>> 3. You can compute Insert and Update in one go, so that you don't have
>>> to join with df_reference twice.
>>>
>>> |df_actions =
>>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
>>> how="left") \ .withColumn('|||_action|',
>>> when(||||col('b.hashkey')||.isNull,
>>> 'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'),
>>> 'Update')) \| .select(col('_action'), *df_source_hashed) \
>>> .dropDuplicates() \ .cache() |
>>>
>>> Since df_actions is cached, you can count inserts and updates quickly
>>> with only that one join in df_actions:
>>>
>>> |inserts_count = df_actions|||.where(col('_action') === 
>>> 'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === 
>>> 'Update')|.count()|
>>>
>>> And you can get rid of the union:
>>>
>>> |df_output = df_actions.where(col('_action').isNotNull) |
>>>
>>> If you have to write that output to parquet anyway, then you can get
>>> the count quickly from the parquet file if it is partitioned by the
>>> _action column (Spark then only looks into parquet's metadata to get
>>> the count, it does not read any row):
>>>
>>> |df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
>>> df_output =
>>> |||sql_context.read.parquet('|||||/path/to/output.parquet|')
>>> |inserts_count = |||df_output|.where(col('_action') ===
>>> 'Insert').count() updates_count = |||df_output|.where(col('_action')
>>> === 'Update').count() |
>>>
>>> These are all just sketches, but I am sure you get the idea.
>>>
>>> Enrico
>>>
>>>
>>> Am 13.02.20 um 05:08 schrieb Ashley Hoff:
>>>> Hi,
>>>>
>>>> I am currently working on an app using PySpark to produce an insert
>>>> and update daily delta capture, being outputted as Parquet.  This is
>>>> running on a 8 core 32 GB Linux server in standalone mode (set to 6
>>>> worker cores of 2GB memory each) running Spark 2.4.3.
>>>>
>>>> This is being achieved by reading in data from a TSQL database, into
>>>> a dataframe, which has a hash of all records appended to it and
>>>> comparing it to a dataframe from yesterdays data (which has been
>>>> saved also as parquet).
>>>>
>>>> As part of the monitoring and logging, I am trying to count the
>>>> number of records for the respective actions.  Example code:
>>>> |df_source = spark_session.read.format('jdbc')..... df_reference =
>>>> sql_context.read.parquet('/path/to/reference.parquet')
>>>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('',
>>>> *df_source.columns))) \ .cache() df_inserts =
>>>> df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>>>> .select(lit('Insert').alias('_action'), *df_source_hashed) \
>>>> .dropDuplicates() \ .cache() inserts_count = df_inserts.count()
>>>> df_updates =
>>>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
>>>> how="inner") \ .select(lit('Update').alias('_action'),
>>>> *df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \
>>>> .dropDuplicates() \ .cache() updates_count = df_updates.count()
>>>> df_output = df_inserts.union(df_updates)
>>>> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')|
>>>> The above code is running two occurrences concurrently via Python
>>>> threading.Thread (this is to try and overcome the network bottle
>>>> neck connecting to the database server).
>>>>
>>>> What I am finding is I am getting some very inconsistent behavior
>>>> with the counts.  Occasionally, it appears that it will freeze up on
>>>> a count operation for a few minutes and quite often that specific
>>>> data frame will have zero records in it.  According to the DAG
>>>> (which I am not 100% sure how to read) the following is the
>>>> processing flow:
>>>>
>>>> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0
>>>>   => WholeStageCodegen/MapPartitionsRDD [75]count at
>>>> NativeMethodAccessorImpl.java:0  =>
>>>> InMemoryTableScan/MapPartitionsRDD [78]count at
>>>> NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at
>>>> NativeMethodAccessorImpl.java:0 =>
>>>> WholeStageCodegen/MapPartitionsRDD [80]count at
>>>> NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
>>>> [81]count at NativeMethodAccessorImpl.java:0
>>>>
>>>> The other observation I have found that if I remove the counts from
>>>> the data frame operations and instead open the outputted parquet
>>>> field and count using a
>>>> `sql_context.read.load('/path/to/output.parquet').filter(col("_action")
>>>> == "Insert").count()` command, I am reducing my run-times by around
>>>> 20 to 30%.  In my feeble mind, opening up the outputs and re-reading
>>>> them seems counter-intuitive.
>>>>
>>>> Is anyone able to give me some guidance on why or how to ensure that
>>>> I am doing the above as efficiently as possible?
>>>>
>>>> Best Regards
>>>> Ashley
>>
>> --
>> nicolas paris
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>


-- 
nicolas paris

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to