[ https://issues.apache.org/jira/browse/SPARK-25940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674704#comment-16674704 ]
Yuming Wang commented on SPARK-25940: ------------------------------------- [~aphilp] Please avoid set the {{Fix Version/s}} and {{Target Version/s}} which is usually reserved for committers. > Add the ability to tag datasets so that tags appear with RDDs in > StageCompleted events when the corresponding sources are read/written > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-25940 > URL: https://issues.apache.org/jira/browse/SPARK-25940 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL > Affects Versions: 2.2.0, 2.3.0 > Reporter: Aby Philip > Priority: Minor > Fix For: 2.3.0, 2.3.1 > > > At work, I recently made some private changes to Spark code to use with our > product. I would like advice on how the changes can be merged and how to go > about it. > Summary > - The datasets can be tagged using an API. e.g. "SRC_source1". > - The tags ("SRC_source1") are propagated through the execution such that > they appear in the StageCompleted events, (along with Rdd names) when the > corresponding source is read from/ written to. > - This allows to associate internal accumulables (e.g. > spark.internal.metrics.recordsWritten) to a Dataset. (and hence correlate > this as rowCount for the source) > Background > Spark is used to created ETL pipelines in the product i work on. Spark code > is generated based on a data pipeline i.e. when the pipeline is executed, an > equivalent spark program is generated and run on a Yarn cluster. There can be > one or more sources and targets in the pipeline. The requirement is to get > the number of records read from each source and number of records written to > each target. Would like to avoid using count() for performance reasons. I > have tried the solutions that have been suggested earlier - e.g named > accumulators, createTempView() (as suggested in > https://github.com/apache/spark/pull/16609#issuecomment-281865742) - but each > had performance impact with big datasets. (Number of records can run into > millions.) > Changes: > I did the following changes - > 1. Added an API to 'tag()' a dataframe. The tag is propagated all the way > during the execution and populated to the SparkListener events. In > SparkCompleted event, the tags corresponding to the RDDs (if any) are listed > too. (This allows to correlate 'spark.internal.metrics.recordsWritten' to a > source whose tag is present in the event). > 2. The accumulable 'spark.internal.metrics.recordsWritten' was missing in > events, because it was not incremented in some cases. So changes were done to > increment it too. > Similar jiras: > I got this idea from the comment > https://github.com/apache/spark/pull/16609#pullrequestreview-17823555 which > mentions about 'tagging' dataframe. However, the focus of that discussion was > on Spark UI. I have not made any changes to SparkUI but has instead populated > the changes only to SparkListener events. > Limitations: > If a stage reads from more than one source Or writes to more than one source > in a stage, the \{recordsRead|recordsWritten} accumulable represents the > total number of rows corresponding to these sources. All these tags would > appear in the event. The only downside is that the number of rows for each > source cannot be distinguished. > Since this is the first time, i am trying to submit a patch, would like some > advise on how to proceed. > - I did the changes in 2.1.0 initially but have merged them later on 2.2.1, > 2.3.0 and 2.3.1. Would need advise on how i should submit the patch. > - The testing was done manually on the product. So i may have to add unit > tests for this. > - I have not focussed on Spark UI. The product just used > SparkListenerStageCompleted events. Is it necessary to make UI changes before > i submit patch. > Please advise. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org