GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/23263
[SPARK-23674][ML] Adds Spark ML Events ## What changes were proposed in this pull request? This PR proposes to add ML events so that other developers can track and add some actions for them. ## Introduction This PR proposes to send some ML events like SQL. This is quite useful when people want to track and make some actions for corresponding ML operations. For instance, I have been working on integrating Apache Spark with [Apache Atlas](https://atlas.apache.org/QuickStart.html). With some custom changes with this PR, I can visualise ML pipeline as below: ![spark_ml_streaming_lineage](https://user-images.githubusercontent.com/6477701/49682779-394bca80-faf5-11e8-85b8-5fae28b784b3.png) I think not to mention how useful it is to track the SQL operations. Likewise, I would like to propose ML events as well (as lowest stability `@Unstable` APIs for now - no guarantee about stability). ## Implementation Details ### Sends event (but not expose ML specific listener) In `events.scala`, it adds: ```scala @Unstable case class ...StartEvent(caller, input) @Unstable case class ...EndEvent(caller, output) object MLEvents { // Wrappers to send events: // def with...Event(body) = { // body() // SparkContext.getOrCreate().listenerBus.post(event) // } } ``` This way mimics both: **1. Catalog events (see `org.apache.spark.sql.catalyst.catalog.events.scala`)** - This allows a Catalog specific listener to be added `ExternalCatalogEventListener` - It's implemented in a way of wrapping whole `ExternalCatalog` named `ExternalCatalogWithListener` which delegates the operations to `ExternalCatalog` This is not quite possible in this case because most of instances (like `Pipeline`) will be directly created in most of cases. We might be able to do that via extending `ListenerBus` for all possible instances but IMHO it's too invasive. Also, exposing another ML specific listener sounds a bit too much at this stage. Therefore, I simply borrowed file name and structures here **2. SQL execution events (see `org.apache.spark.sql.execution.SQLExecution.scala`)** - Add an object that wraps a body to send events Current apporach is rather close to this. It has a `with...` wrapper to send events. I borrowed this approach to be consistent. ### Add `...Impl` methods to wrap each to send events **1. `mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala`** ```diff - def save(...) = { saveImpl(...) } + def save(...) = MLEvents.withSaveInstanceEvent { saveImpl(...) } def saveImpl(...): Unit = ... ``` Note that `saveImpl` was already implemented unlike other instances below. ```diff - def load(...): T + def load(...): T = MLEvents.withLoadInstanceEvent { loadImple(...) } + def loadImpl(...): T ``` **2. `mllib/src/main/scala/org/apache/spark/ml/Estimator.scala`** ```diff - def fit(...): Model + def fit(...): Model = MLEvents.withFitEvent { fitImpl(...) } + def fitImpl(...): Model ``` **3. `mllib/src/main/scala/org/apache/spark/ml/Transformer.scala`** ```diff - def transform(...): DataFrame + def transform(...): DataFrame = MLEvents.withTransformEvent { transformImpl(...) } + def transformImpl(...): DataFrame ``` This approach follows the existing way as below in ML: **1. `transform` and `transformImpl`** https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala#L202-L213 https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala#L191-L196 https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala#L1037-L1042 **2. `save` and `saveImpl`** https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala#L166-L176 Inherited ones are intentionally omitted here for simplicity. They are inherited and implemented at multiple places. ## Backward Compatibility _This keeps both source and binary backward compatibility_. I was thinking enforcing `...Impl` by leaving it abstract methods to force to implement but just decided to leave a body that throws `UnsupportedOperationException` so that we can keep full source and binary compatibilities. - For user-faced API perspective, _there's no difference_. `...Impl` methods are protected and not visible to end users. - For developer API perspective, if some developers want to `...` methods instead of `...Impl`, that's still fine. It only does not handle events. If developers want to handle events from their custom implementation, they should implement `...Impl`. Of course, it is encouraged to implement `...Impl` ## How was this patch tested? Manually tested and unit tests were added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-23674-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23263.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23263 ---- commit a9112f33ff8fbfb66bad76bff6898abdef5b6881 Author: Hyukjin Kwon <gurwls223@...> Date: 2018-12-05T06:38:05Z Adds Spark ML Events ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org