[ https://issues.apache.org/jira/browse/FLINK-2073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559542#comment-14559542 ]
ASF GitHub Bot commented on FLINK-2073: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/727#discussion_r31063043 --- Diff: docs/libs/ml/contribution_guide.md --- @@ -20,7 +21,329 @@ specific language governing permissions and limitations under the License. --> +The Flink community highly appreciates all sorts of contributions to FlinkML. +FlinkML offers people interested in machine learning to work on a highly active open source project which makes scalable ML reality. +The following document describes how to contribute to FlinkML. + * This will be replaced by the TOC {:toc} -Coming soon. In the meantime, check our list of [open issues on JIRA](https://issues.apache.org/jira/browse/FLINK-1748?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC) +## Getting Started + +In order to get started first read Flink's [contribution guide](http://flink.apache.org/how-to-contribute.html). +Everything from this guide also applies to FlinkML. + +## Pick a Topic + +If you are looking for some new ideas, then you should check out the list of [unresolved issues on JIRA](https://issues.apache.org/jira/issues/?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC). +Once you decide to contribute to one of these issues, you should take ownership of it and track your progress with this issue. +That way, the other contributors know the state of the different issues and redundant work is avoided. + +If you already know what you want to contribute to FlinkML all the better. +It is still advisable to create a JIRA issue for your idea to tell the Flink community what you want to do, though. + +## Testing + +New contributions should come with tests to verify the correct behavior of the algorithm. +The tests help to maintain the algorithm's correctness throughout code changes, e.g. refactorings. + +We distinguish between unit tests, which are executed during maven's test phase, and integration tests, which are executed during maven's verify phase. +Maven automatically makes this distinction by using the following naming rules: +All test cases whose class name ends with a suffix fulfilling the regular expression `(IT|Integration)(Test|Suite|Case)`, are considered integration tests. +The rest are considered unit tests and should only test behavior which is local to the component under test. + +An integration test is a test which requires the full Flink system to be started. +In order to do that properly, all integration test cases have to mix in the trait `FlinkTestBase`. +This trait will set the right `ExecutionEnvironment` so that the test will be executed on a special `FlinkMiniCluster` designated for testing purposes. +Thus, an integration test could look the following: + +{% highlight scala %} +class ExampleITSuite extends FlatSpec with FlinkTestBase { + behavior of "An example algorithm" + + it should "do something" in { + ... + } +} +{% endhighlight %} + +The test style does not have to be `FlatSpec` but can be any other scalatest `Suite` subclass. + +## Documentation + +When contributing new algorithms, it is required to add code comments describing the functioning of the algorithm and its parameters with which the user can control its behavior. +Additionally, we would like to encourage contributors to add this information to the online documentation. +The online documentation for FlinkML's components can be found in the directory `docs/libs/ml`. + +Every new algorithm is described by a single markdown file. +This file should contain at least the following points: + +1. What does the algorithm do +2. How does the algorithm work (or reference to description) +3. Parameter description with default values +4. Code snippet showing how the algorithm is used + +In order to use latex syntax in the markdown file, you have to include `mathjax: include` in the YAML front matter. + +{% highlight java %} +--- +mathjax: include +title: Example title +--- +{% endhighlight %} + +In order to use displayed mathematics, you have to put your latex code in `$$ ... $$`. +For in-line mathematics, use `$ ... $`. +Additionally some predefined latex commands are included into the scope of your markdown file. +See `docs/_include/latex_commands.html` for the complete list of predefined latex commands. + +## Contributing + +Once you have implemented the algorithm with adequate test coverage and added documentation, you are ready to open a pull request. +Details of how to open a pull request can be found [here](http://flink.apache.org/how-to-contribute.html#contributing-code--documentation). + +## How to Implement a Pipeline Operator + +FlinkML follows the principle to make machine learning as easy and accessible as possible. +Therefore, it supports a flexible pipelining mechanism which allows users to quickly define their analysis pipelines consisting of a multitude of different components. +A pipeline operator is either a `Transformer` or a `Predictor`. +A `Transformer` can be fitted to training data and transforms data from one format into another format. +A scaler which changes the mean and variance of its input data according to the mean and variance of some training data is an example for a `Transformer`. +In contrast, a `Predictor` encapsulates a data model and the corresponding logic to train it. +Once a `Predictor` has trained the model, it can be used to make new predictions. +A support vector machine which is first trained to obtain the support vectors and then used to classify data points is an example for a `Predictor`. +A general description of FlinkML's pipelining can be found [here]({{site.baseurl}}/libs/ml/pipelines.html). +In order to support the pipelining, algorithms have to adhere to a certain design pattern, which we will describe next. + +Let's assume that we want to implement a pipeline operator which changes the mean of your data. +At first, we have to reflect which type of pipeline operator it is. +Since centering data is a common preprocessing step in any analysis pipeline, we will implement it as a `Transformer`. +Therefore, we first create a `MeanTransformer` class which inherits from `Transformer` + +{% highlight scala %} +class MeanTransformer extends Transformer[Centering] {} +{% endhighlight %} + +Since we want to be able to configure the mean of the resulting data, we have to add a configuration parameter. + +{% highlight scala %} +class MeanTransformer extends Transformer[Centering] { + def setMean(mean: Double): Mean = { + parameters.add(MeanTransformer.Mean, mu) + } +} + +object MeanTransformer { + case object Mean extends Parameter[Double] { + override val defaultValue: Option[Double] = Some(0.0) + } + + def apply(): MeanTransformer = new MeanTransformer +} +{% endhighlight %} + +Parameters are defined in the companion object of the transformer class and extend the `Parameter` class. +The default value will be used if no other value has been set by the user of this component. +If no default value has been specified, meaning that `defaultValue = None`, then the algorithm has to handle this situation accordingly. + +We can now instantiate a `MeanTransformer` object and set the mean value of the transformed data. +But we still have to implement how the transformation works. +The workflow can be separated into two phases. +Within the first phase, the transformer learns the mean of the given training data. +This knowledge can then be used in the second phase to transform the provided data with respect to the configured resulting mean value. + +The learning of the mean can be implemented within the `fit` operation of a `Transformer`. +Within the `fit` operation, a pipeline component is trained with respect to the given training data. +The algorithm is, however, **not** implemented by overriding the `fit` method but by providing an implementation of a corresponding `FitOperation` for the correct type. +Taking a look at the definition of the `fit` method in `Estimator`, which is the parent class of `Transformer`, reveals what why this is the case. + +{% highlight scala %} +trait Estimator[Self] extends WithParameters with Serializable { + that: Self => + + def fit[Training]( + training: DataSet[Training], + fitParameters: ParameterMap = ParameterMap.Empty) + (implicit fitOperation: FitOperation[Self, Training]): Unit = { + FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment) + fitOperation.fit(this, fitParameters, training) + } +} +{% endhighlight %} + +We see that the `fit` method is called with an input data set of type `Training`, an optional parameter list and in the second parameter list with an implicit parameter of type `FitOperation`. +Within the body of the function, first some machine learning types are registered and then the `fit` method of the `FitOperation` parameter is called. +The instance gives itself, the parameter map and the training data set as a parameters to the method. +Thus, all the program logic takes place within the `FitOperation`. + +The `FitOperation` has two type parameters. +The first defines the pipeline operator type for which this `FitOperation` shall work and the second type parameter defines the type of the data set elements. +If we first wanted to implement the `MeanTransformer` to work on `DenseVector`, we would, thus, have to provide an implementation for `FitOperation[MeanTransformer, DenseVector]`. + +{% highlight scala %} +val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] { + override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = { + import org.apache.flink.ml.math.Breeze._ + val meanTrainingData: DataSet[DenseVector] = input + .map{ x => (x.asBreeze, 1) } + .reduce{ + (left, right) => + (left._1 + right._1, left._2 + right._2) + } + .map{ p => (p._1/p._2).fromBreeze } + } +} +{% endhighlight %} + +A `FitOperation[T, I]` has a `fit` method which is called with an instance of type `T`, a parameter map and an input `DataSet[I]`. +In our case `T=MeanTransformer` and `I=DenseVector`. +The parameter map is necessary if our fit step depends on some parameter values which were not given directly at creation time of the `Transformer`. +The `FitOperation` of the `MeanTransformer` sums the `DenseVector` instances of the given input data set up and divides the result by the total number of vectors. +That way, we obtain a `DataSet[DenseVector]` with a single element which is the mean value. + +But if we look closely at the implementation, we see that the result of the mean computation is never stored anywhere. +If we want to use this knowledge in a later step to adjust the mean of some other input, we have to keep it around. +And here is where the parameter of type `MeanTransformer` which is given to the `fit` method comes into play. +We can use this instance to store state, which is used by a subsequent `transform` operation which works on the same object. +But first we have to extend `MeanTransformer` by a member field and then adjust the `FitOperation` implementation. + +{% highlight scala %} +class MeanTransformer extends Transformer[Centering] { + var meanOption: Option[DataSet[DenseVector]] = None + + def setMean(mean: Double): Mean = { + parameters.add(MeanTransformer.Mean, mu) + } +} + +val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] { + override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = { + import org.apache.flink.ml.math.Breeze._ + + instance.meanOption = Some(input + .map{ x => (x.asBreeze, 1) } + .reduce{ + (left, right) => + (left._1 + right._1, left._2 + right._2) + } + .map{ p => (p._1/p._2).fromBreeze }) + } +} +{% endhighlight %} + +If we look at the `transform` method in `Transformer`, we will see that we also need an implementation of `TransformOperation`. +A possible mean transforming implementation could look the following. --- End diff -- :+1: > Add contribution guide for FlinkML > ---------------------------------- > > Key: FLINK-2073 > URL: https://issues.apache.org/jira/browse/FLINK-2073 > Project: Flink > Issue Type: New Feature > Components: Documentation, Machine Learning Library > Reporter: Theodore Vasiloudis > Assignee: Till Rohrmann > Fix For: 0.9 > > > We need a guide for contributions to FlinkML in order to encourage the > extension of the library, and provide guidelines for developers. > One thing that should be included is a step-by-step guide to create a > transformer, or other Estimator -- This message was sent by Atlassian JIRA (v6.3.4#6332)