Aby Philip created SPARK-25940:
----------------------------------

             Summary: Add the ability to tag datasets so that tags appear with 
RDDs in StageCompleted events when the corresponding sources are read/written
                 Key: SPARK-25940
                 URL: https://issues.apache.org/jira/browse/SPARK-25940
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core, SQL
    Affects Versions: 2.3.0, 2.2.0
            Reporter: Aby Philip
             Fix For: 2.3.1, 2.3.0


At work, I recently made some private changes to Spark code to use with our 
product. I would like advice on how the changes can be merged and how to go 
about it.

Summary
- The datasets can be tagged using an API. e.g. "SRC_source1". 
- The tags ("SRC_source1") are propagated through the execution such that they 
appear in the StageCompleted events, (along with Rdd names) when the 
corresponding source is read from/ written to.
- This allows to associate internal accumulables (e.g. 
spark.internal.metrics.recordsWritten) to a Dataset. (and hence correlate this 
as rowCount for the source)

Background 
Spark is used to created ETL pipelines in the product i work on. Spark code is 
generated based on a data pipeline i.e. when the pipeline is executed, an 
equivalent spark program is generated and run on a Yarn cluster. There can be 
one or more sources and targets in the pipeline. The requirement is to get the 
number of records read from each source and number of records written to each 
target. Would like to avoid using count() for performance reasons. I have tried 
the solutions that have been suggested earlier - e.g named accumulators, 
createTempView() (as suggested in 
https://github.com/apache/spark/pull/16609#issuecomment-281865742) - but each 
had performance impact with big datasets. (Number of records can run into 
millions.)

Changes: 
I did the following changes - 
1. Added an API to 'tag()' a dataframe. The tag is propagated all the way 
during the execution and populated to the SparkListener events. In 
SparkCompleted event, the tags corresponding to the RDDs (if any) are listed 
too. (This allows to correlate 'spark.internal.metrics.recordsWritten' to a 
source whose tag is present in the event). 
2. The accumulable 'spark.internal.metrics.recordsWritten' was missing in 
events, because it was not incremented in some cases. So changes were done to 
increment it too.

Similar jiras: 
I got this idea from the comment 
https://github.com/apache/spark/pull/16609#pullrequestreview-17823555 which 
mentions about 'tagging' dataframe. However, the focus of that discussion was 
on Spark UI. I have not made any changes to SparkUI but has instead populated 
the changes only to SparkListener events.

Limitations:
If a stage reads from more than one source Or writes to more than one source in 
a stage, the \{recordsRead|recordsWritten} accumulable represents the total 
number of rows corresponding to these sources. All these tags would appear in 
the event. The only downside is that the number of rows for each source cannot 
be distinguished.

Since this is the first time, i am trying to submit a patch, would like some 
advise on how to proceed.
- I did the changes in 2.1.0 initially but have merged them later on 2.2.1, 
2.3.0 and 2.3.1. Would need advise on how i should submit the patch.
- The testing was done manually on the product. So i may have to add unit tests 
for this. 
- I have not focussed on Spark UI. The product just used 
SparkListenerStageCompleted events. Is it necessary to make UI changes before i 
submit patch.
Please advise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to