> .dropDuplicates() \ .cache() | > Since df_actions is cached, you can count inserts and updates quickly > with only that one join in df_actions:
Hi Enrico. I am wondering if this is ok for very large tables ? Is caching faster than recomputing both insert/update ? Thanks Enrico Minack <m...@enrico.minack.dev> writes: > Ashley, > > I want to suggest a few optimizations. The problem might go away but > at least performance should improve. > The freeze problems could have many reasons, the Spark UI SQL pages > and stages detail pages would be useful. You can send them privately, > if you wish. > > 1. the repartition(1) should be replaced by coalesce(1). The former > will shuffle all data, while the latter will read in the existing > partitions and not shuffle them again. > 2. Repartitioning to a single partition is discouraged, unless you can > guarantee the data fit into one worker's memory. > 3. You can compute Insert and Update in one go, so that you don't have > to join with df_reference twice. > > |df_actions = > df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, > how="left") \ .withColumn('|||_action|', > when(||||col('b.hashkey')||.isNull, > 'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'), > 'Update')) \| .select(col('_action'), *df_source_hashed) \ > .dropDuplicates() \ .cache() | > > Since df_actions is cached, you can count inserts and updates quickly > with only that one join in df_actions: > > |inserts_count = df_actions|||.where(col('_action') === > 'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === > 'Update')|.count()| > > And you can get rid of the union: > > |df_output = df_actions.where(col('_action').isNotNull) | > > If you have to write that output to parquet anyway, then you can get > the count quickly from the parquet file if it is partitioned by the > _action column (Spark then only looks into parquet's metadata to get > the count, it does not read any row): > > |df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet') > df_output = > |||sql_context.read.parquet('|||||/path/to/output.parquet|') > |inserts_count = |||df_output|.where(col('_action') === > 'Insert').count() updates_count = |||df_output|.where(col('_action') > === 'Update').count() | > > These are all just sketches, but I am sure you get the idea. > > Enrico > > > Am 13.02.20 um 05:08 schrieb Ashley Hoff: >> Hi, >> >> I am currently working on an app using PySpark to produce an insert >> and update daily delta capture, being outputted as Parquet. This is >> running on a 8 core 32 GB Linux server in standalone mode (set to 6 >> worker cores of 2GB memory each) running Spark 2.4.3. >> >> This is being achieved by reading in data from a TSQL database, into >> a dataframe, which has a hash of all records appended to it and >> comparing it to a dataframe from yesterdays data (which has been >> saved also as parquet). >> >> As part of the monitoring and logging, I am trying to count the >> number of records for the respective actions. Example code: >> |df_source = spark_session.read.format('jdbc')..... df_reference = >> sql_context.read.parquet('/path/to/reference.parquet') >> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', >> *df_source.columns))) \ .cache() df_inserts = >> df_source_hashed.join(df_reference, pk_list, how='left_anti') \ >> .select(lit('Insert').alias('_action'), *df_source_hashed) \ >> .dropDuplicates() \ .cache() inserts_count = df_inserts.count() >> df_updates = >> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, >> how="inner") \ .select(lit('Update').alias('_action'), >> *df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \ >> .dropDuplicates() \ .cache() updates_count = df_updates.count() >> df_output = df_inserts.union(df_updates) >> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')| >> The above code is running two occurrences concurrently via Python >> threading.Thread (this is to try and overcome the network bottle >> neck connecting to the database server). >> >> What I am finding is I am getting some very inconsistent behavior >> with the counts. Occasionally, it appears that it will freeze up on >> a count operation for a few minutes and quite often that specific >> data frame will have zero records in it. According to the DAG >> (which I am not 100% sure how to read) the following is the >> processing flow: >> >> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0 >> => WholeStageCodegen/MapPartitionsRDD [75]count at >> NativeMethodAccessorImpl.java:0 => >> InMemoryTableScan/MapPartitionsRDD [78]count at >> NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at >> NativeMethodAccessorImpl.java:0 => >> WholeStageCodegen/MapPartitionsRDD [80]count at >> NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD >> [81]count at NativeMethodAccessorImpl.java:0 >> >> The other observation I have found that if I remove the counts from >> the data frame operations and instead open the outputted parquet >> field and count using a >> `sql_context.read.load('/path/to/output.parquet').filter(col("_action") >> == "Insert").count()` command, I am reducing my run-times by around >> 20 to 30%. In my feeble mind, opening up the outputs and re-reading >> them seems counter-intuitive. >> >> Is anyone able to give me some guidance on why or how to ensure that >> I am doing the above as efficiently as possible? >> >> Best Regards >> Ashley -- nicolas paris --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org