[ 
https://issues.apache.org/jira/browse/SPARK-25940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674704#comment-16674704
 ] 

Yuming Wang commented on SPARK-25940:
-------------------------------------

[~aphilp] Please avoid set the {{Fix Version/s}} and {{Target Version/s}} which 
is usually reserved for committers.

> Add the ability to tag datasets so that tags appear with RDDs in 
> StageCompleted events when the corresponding sources are read/written
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-25940
>                 URL: https://issues.apache.org/jira/browse/SPARK-25940
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, SQL
>    Affects Versions: 2.2.0, 2.3.0
>            Reporter: Aby Philip
>            Priority: Minor
>             Fix For: 2.3.0, 2.3.1
>
>
> At work, I recently made some private changes to Spark code to use with our 
> product. I would like advice on how the changes can be merged and how to go 
> about it.
> Summary
> - The datasets can be tagged using an API. e.g. "SRC_source1". 
> - The tags ("SRC_source1") are propagated through the execution such that 
> they appear in the StageCompleted events, (along with Rdd names) when the 
> corresponding source is read from/ written to.
> - This allows to associate internal accumulables (e.g. 
> spark.internal.metrics.recordsWritten) to a Dataset. (and hence correlate 
> this as rowCount for the source)
> Background 
> Spark is used to created ETL pipelines in the product i work on. Spark code 
> is generated based on a data pipeline i.e. when the pipeline is executed, an 
> equivalent spark program is generated and run on a Yarn cluster. There can be 
> one or more sources and targets in the pipeline. The requirement is to get 
> the number of records read from each source and number of records written to 
> each target. Would like to avoid using count() for performance reasons. I 
> have tried the solutions that have been suggested earlier - e.g named 
> accumulators, createTempView() (as suggested in 
> https://github.com/apache/spark/pull/16609#issuecomment-281865742) - but each 
> had performance impact with big datasets. (Number of records can run into 
> millions.)
> Changes: 
> I did the following changes - 
> 1. Added an API to 'tag()' a dataframe. The tag is propagated all the way 
> during the execution and populated to the SparkListener events. In 
> SparkCompleted event, the tags corresponding to the RDDs (if any) are listed 
> too. (This allows to correlate 'spark.internal.metrics.recordsWritten' to a 
> source whose tag is present in the event). 
> 2. The accumulable 'spark.internal.metrics.recordsWritten' was missing in 
> events, because it was not incremented in some cases. So changes were done to 
> increment it too.
> Similar jiras: 
> I got this idea from the comment 
> https://github.com/apache/spark/pull/16609#pullrequestreview-17823555 which 
> mentions about 'tagging' dataframe. However, the focus of that discussion was 
> on Spark UI. I have not made any changes to SparkUI but has instead populated 
> the changes only to SparkListener events.
> Limitations:
> If a stage reads from more than one source Or writes to more than one source 
> in a stage, the \{recordsRead|recordsWritten} accumulable represents the 
> total number of rows corresponding to these sources. All these tags would 
> appear in the event. The only downside is that the number of rows for each 
> source cannot be distinguished.
> Since this is the first time, i am trying to submit a patch, would like some 
> advise on how to proceed.
> - I did the changes in 2.1.0 initially but have merged them later on 2.2.1, 
> 2.3.0 and 2.3.1. Would need advise on how i should submit the patch.
> - The testing was done manually on the product. So i may have to add unit 
> tests for this. 
> - I have not focussed on Spark UI. The product just used 
> SparkListenerStageCompleted events. Is it necessary to make UI changes before 
> i submit patch.
> Please advise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to