> either materialize the Dataframe on HDFS (e.g. parquet or checkpoint) I wonder if avro is a better candidate for this because it's row oriented it should be faster to write/read for such a task. Never heard about checkpoint.
Enrico Minack <m...@enrico.minack.dev> writes: > It is not about very large or small, it is about how large your > cluster is w.r.t. your data. Caching is only useful if you have the > respective memory available across your executors. Otherwise you could > either materialize the Dataframe on HDFS (e.g. parquet or checkpoint) > or indeed have to do the join twice. It's a memory-over-CPU trade-off. > > Enrico > > > Am 17.02.20 um 22:06 schrieb Nicolas PARIS: >>> .dropDuplicates() \ .cache() | >>> Since df_actions is cached, you can count inserts and updates quickly >>> with only that one join in df_actions: >> Hi Enrico. I am wondering if this is ok for very large tables ? Is >> caching faster than recomputing both insert/update ? >> >> Thanks >> >> Enrico Minack <m...@enrico.minack.dev> writes: >> >>> Ashley, >>> >>> I want to suggest a few optimizations. The problem might go away but >>> at least performance should improve. >>> The freeze problems could have many reasons, the Spark UI SQL pages >>> and stages detail pages would be useful. You can send them privately, >>> if you wish. >>> >>> 1. the repartition(1) should be replaced by coalesce(1). The former >>> will shuffle all data, while the latter will read in the existing >>> partitions and not shuffle them again. >>> 2. Repartitioning to a single partition is discouraged, unless you can >>> guarantee the data fit into one worker's memory. >>> 3. You can compute Insert and Update in one go, so that you don't have >>> to join with df_reference twice. >>> >>> |df_actions = >>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, >>> how="left") \ .withColumn('|||_action|', >>> when(||||col('b.hashkey')||.isNull, >>> 'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'), >>> 'Update')) \| .select(col('_action'), *df_source_hashed) \ >>> .dropDuplicates() \ .cache() | >>> >>> Since df_actions is cached, you can count inserts and updates quickly >>> with only that one join in df_actions: >>> >>> |inserts_count = df_actions|||.where(col('_action') === >>> 'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === >>> 'Update')|.count()| >>> >>> And you can get rid of the union: >>> >>> |df_output = df_actions.where(col('_action').isNotNull) | >>> >>> If you have to write that output to parquet anyway, then you can get >>> the count quickly from the parquet file if it is partitioned by the >>> _action column (Spark then only looks into parquet's metadata to get >>> the count, it does not read any row): >>> >>> |df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet') >>> df_output = >>> |||sql_context.read.parquet('|||||/path/to/output.parquet|') >>> |inserts_count = |||df_output|.where(col('_action') === >>> 'Insert').count() updates_count = |||df_output|.where(col('_action') >>> === 'Update').count() | >>> >>> These are all just sketches, but I am sure you get the idea. >>> >>> Enrico >>> >>> >>> Am 13.02.20 um 05:08 schrieb Ashley Hoff: >>>> Hi, >>>> >>>> I am currently working on an app using PySpark to produce an insert >>>> and update daily delta capture, being outputted as Parquet. This is >>>> running on a 8 core 32 GB Linux server in standalone mode (set to 6 >>>> worker cores of 2GB memory each) running Spark 2.4.3. >>>> >>>> This is being achieved by reading in data from a TSQL database, into >>>> a dataframe, which has a hash of all records appended to it and >>>> comparing it to a dataframe from yesterdays data (which has been >>>> saved also as parquet). >>>> >>>> As part of the monitoring and logging, I am trying to count the >>>> number of records for the respective actions. Example code: >>>> |df_source = spark_session.read.format('jdbc')..... df_reference = >>>> sql_context.read.parquet('/path/to/reference.parquet') >>>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', >>>> *df_source.columns))) \ .cache() df_inserts = >>>> df_source_hashed.join(df_reference, pk_list, how='left_anti') \ >>>> .select(lit('Insert').alias('_action'), *df_source_hashed) \ >>>> .dropDuplicates() \ .cache() inserts_count = df_inserts.count() >>>> df_updates = >>>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, >>>> how="inner") \ .select(lit('Update').alias('_action'), >>>> *df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \ >>>> .dropDuplicates() \ .cache() updates_count = df_updates.count() >>>> df_output = df_inserts.union(df_updates) >>>> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')| >>>> The above code is running two occurrences concurrently via Python >>>> threading.Thread (this is to try and overcome the network bottle >>>> neck connecting to the database server). >>>> >>>> What I am finding is I am getting some very inconsistent behavior >>>> with the counts. Occasionally, it appears that it will freeze up on >>>> a count operation for a few minutes and quite often that specific >>>> data frame will have zero records in it. According to the DAG >>>> (which I am not 100% sure how to read) the following is the >>>> processing flow: >>>> >>>> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0 >>>> => WholeStageCodegen/MapPartitionsRDD [75]count at >>>> NativeMethodAccessorImpl.java:0 => >>>> InMemoryTableScan/MapPartitionsRDD [78]count at >>>> NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at >>>> NativeMethodAccessorImpl.java:0 => >>>> WholeStageCodegen/MapPartitionsRDD [80]count at >>>> NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD >>>> [81]count at NativeMethodAccessorImpl.java:0 >>>> >>>> The other observation I have found that if I remove the counts from >>>> the data frame operations and instead open the outputted parquet >>>> field and count using a >>>> `sql_context.read.load('/path/to/output.parquet').filter(col("_action") >>>> == "Insert").count()` command, I am reducing my run-times by around >>>> 20 to 30%. In my feeble mind, opening up the outputs and re-reading >>>> them seems counter-intuitive. >>>> >>>> Is anyone able to give me some guidance on why or how to ensure that >>>> I am doing the above as efficiently as possible? >>>> >>>> Best Regards >>>> Ashley >> >> -- >> nicolas paris >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> -- nicolas paris --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org