[ 
https://issues.apache.org/jira/browse/FLINK-2073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559542#comment-14559542
 ] 

ASF GitHub Bot commented on FLINK-2073:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/727#discussion_r31063043
  
    --- Diff: docs/libs/ml/contribution_guide.md ---
    @@ -20,7 +21,329 @@ specific language governing permissions and limitations
     under the License.
     -->
     
    +The Flink community highly appreciates all sorts of contributions to 
FlinkML.
    +FlinkML offers people interested in machine learning to work on a highly 
active open source project which makes scalable ML reality.
    +The following document describes how to contribute to FlinkML.
    +
     * This will be replaced by the TOC
     {:toc}
     
    -Coming soon. In the meantime, check our list of [open issues on 
JIRA](https://issues.apache.org/jira/browse/FLINK-1748?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC)
    +## Getting Started
    +
    +In order to get started first read Flink's [contribution 
guide](http://flink.apache.org/how-to-contribute.html).
    +Everything from this guide also applies to FlinkML.
    +
    +## Pick a Topic
    +
    +If you are looking for some new ideas, then you should check out the list 
of [unresolved issues on 
JIRA](https://issues.apache.org/jira/issues/?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC).
    +Once you decide to contribute to one of these issues, you should take 
ownership of it and track your progress with this issue.
    +That way, the other contributors know the state of the different issues 
and redundant work is avoided.
    +
    +If you already know what you want to contribute to FlinkML all the better.
    +It is still advisable to create a JIRA issue for your idea to tell the 
Flink community what you want to do, though.
    +
    +## Testing
    +
    +New contributions should come with tests to verify the correct behavior of 
the algorithm.
    +The tests help to maintain the algorithm's correctness throughout code 
changes, e.g. refactorings.
    +
    +We distinguish between unit tests, which are executed during maven's test 
phase, and integration tests, which are executed during maven's verify phase.
    +Maven automatically makes this distinction by using the following naming 
rules:
    +All test cases whose class name ends with a suffix fulfilling the regular 
expression `(IT|Integration)(Test|Suite|Case)`, are considered integration 
tests.
    +The rest are considered unit tests and should only test behavior which is 
local to the component under test.
    +
    +An integration test is a test which requires the full Flink system to be 
started.
    +In order to do that properly, all integration test cases have to mix in 
the trait `FlinkTestBase`.
    +This trait will set the right `ExecutionEnvironment` so that the test will 
be executed on a special `FlinkMiniCluster` designated for testing purposes.
    +Thus, an integration test could look the following:
    +
    +{% highlight scala %}
    +class ExampleITSuite extends FlatSpec with FlinkTestBase {
    +  behavior of "An example algorithm"
    +  
    +  it should "do something" in {
    +    ...
    +  }
    +}
    +{% endhighlight %}
    +
    +The test style does not have to be `FlatSpec` but can be any other 
scalatest `Suite` subclass. 
    +
    +## Documentation
    +
    +When contributing new algorithms, it is required to add code comments 
describing the functioning of the algorithm and its parameters with which the 
user can control its behavior.
    +Additionally, we would like to encourage contributors to add this 
information to the online documentation.
    +The online documentation for FlinkML's components can be found in the 
directory `docs/libs/ml`.
    +
    +Every new algorithm is described by a single markdown file.
    +This file should contain at least the following points:
    +
    +1. What does the algorithm do
    +2. How does the algorithm work (or reference to description) 
    +3. Parameter description with default values
    +4. Code snippet showing how the algorithm is used
    +
    +In order to use latex syntax in the markdown file, you have to include 
`mathjax: include` in the YAML front matter.
    + 
    +{% highlight java %}
    +---
    +mathjax: include
    +title: Example title
    +---
    +{% endhighlight %}
    +
    +In order to use displayed mathematics, you have to put your latex code in 
`$$ ... $$`.
    +For in-line mathematics, use `$ ... $`.
    +Additionally some predefined latex commands are included into the scope of 
your markdown file.
    +See `docs/_include/latex_commands.html` for the complete list of 
predefined latex commands.
    +
    +## Contributing
    +
    +Once you have implemented the algorithm with adequate test coverage and 
added documentation, you are ready to open a pull request.
    +Details of how to open a pull request can be found 
[here](http://flink.apache.org/how-to-contribute.html#contributing-code--documentation).
 
    +
    +## How to Implement a Pipeline Operator
    +
    +FlinkML follows the principle to make machine learning as easy and 
accessible as possible.
    +Therefore, it supports a flexible pipelining mechanism which allows users 
to quickly define their analysis pipelines consisting of a multitude of 
different components.
    +A pipeline operator is either a `Transformer` or a `Predictor`.
    +A `Transformer` can be fitted to training data and transforms data from 
one format into another format.
    +A scaler which changes the mean and variance of its input data according 
to the mean and variance of some training data is an example for a 
`Transformer`.
    +In contrast, a `Predictor` encapsulates a data model and the corresponding 
logic to train it.
    +Once a `Predictor` has trained the model, it can be used to make new 
predictions.
    +A support vector machine which is first trained to obtain the support 
vectors and then used to classify data points is an example for a `Predictor`.
    +A general description of FlinkML's pipelining can be found 
[here]({{site.baseurl}}/libs/ml/pipelines.html).
    +In order to support the pipelining, algorithms have to adhere to a certain 
design pattern, which we will describe next.
    +
    +Let's assume that we want to implement a pipeline operator which changes 
the mean of your data.
    +At first, we have to reflect which type of pipeline operator it is.
    +Since centering data is a common preprocessing step in any analysis 
pipeline, we will implement it as a `Transformer`.
    +Therefore, we first create a `MeanTransformer` class which inherits from 
`Transformer`
    +
    +{% highlight scala %}
    +class MeanTransformer extends Transformer[Centering] {}
    +{% endhighlight %}
    +
    +Since we want to be able to configure the mean of the resulting data, we 
have to add a configuration parameter.
    +
    +{% highlight scala %}
    +class MeanTransformer extends Transformer[Centering] {
    +  def setMean(mean: Double): Mean = {
    +    parameters.add(MeanTransformer.Mean, mu)
    +  }
    +}
    +
    +object MeanTransformer {
    +  case object Mean extends Parameter[Double] {
    +    override val defaultValue: Option[Double] = Some(0.0)
    +  }
    +  
    +  def apply(): MeanTransformer = new MeanTransformer
    +}
    +{% endhighlight %}
    +
    +Parameters are defined in the companion object of the transformer class 
and extend the `Parameter` class.
    +The default value will be used if no other value has been set by the user 
of this component.
    +If no default value has been specified, meaning that `defaultValue = 
None`, then the algorithm has to handle this situation accordingly.
    +
    +We can now instantiate a `MeanTransformer` object and set the mean value 
of the transformed data.
    +But we still have to implement how the transformation works.
    +The workflow can be separated into two phases.
    +Within the first phase, the transformer learns the mean of the given 
training data.
    +This knowledge can then be used in the second phase to transform the 
provided data with respect to the configured resulting mean value.
    +
    +The learning of the mean can be implemented within the `fit` operation of 
a `Transformer`.
    +Within the `fit` operation, a pipeline component is trained with respect 
to the given training data.
    +The algorithm is, however, **not** implemented by overriding the `fit` 
method but by providing an implementation of a corresponding `FitOperation` for 
the correct type.
    +Taking a look at the definition of the `fit` method in `Estimator`, which 
is the parent class of `Transformer`, reveals what why this is the case.
    +
    +{% highlight scala %}
    +trait Estimator[Self] extends WithParameters with Serializable {
    +  that: Self =>
    +
    +  def fit[Training](
    +      training: DataSet[Training],
    +      fitParameters: ParameterMap = ParameterMap.Empty)
    +      (implicit fitOperation: FitOperation[Self, Training]): Unit = {
    +    FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment)
    +    fitOperation.fit(this, fitParameters, training)
    +  }
    +}
    +{% endhighlight %}
    +
    +We see that the `fit` method is called with an input data set of type 
`Training`, an optional parameter list and in the second parameter list with an 
implicit parameter of type `FitOperation`.
    +Within the body of the function, first some machine learning types are 
registered and then the `fit` method of the `FitOperation` parameter is called.
    +The instance gives itself, the parameter map and the training data set as 
a parameters to the method.
    +Thus, all the program logic takes place within the `FitOperation`.
    +
    +The `FitOperation` has two type parameters.
    +The first defines the pipeline operator type for which this `FitOperation` 
shall work and the second type parameter defines the type of the data set 
elements.
    +If we first wanted to implement the `MeanTransformer` to work on 
`DenseVector`, we would, thus, have to provide an implementation for 
`FitOperation[MeanTransformer, DenseVector]`.
    + 
    +{% highlight scala %}
    +val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, 
DenseVector] {
    +  override def fit(instance: MeanTransformer, fitParameters: ParameterMap, 
input: DataSet[DenseVector]) : Unit = {
    +    import org.apache.flink.ml.math.Breeze._
    +    val meanTrainingData: DataSet[DenseVector] = input
    +      .map{ x => (x.asBreeze, 1) }
    +      .reduce{
    +        (left, right) => 
    +          (left._1 + right._1, left._2 + right._2) 
    +      }
    +      .map{ p => (p._1/p._2).fromBreeze }
    +  }
    +}
    +{% endhighlight %}
    +
    +A `FitOperation[T, I]` has a `fit` method which is called with an instance 
of type `T`, a parameter map and an input `DataSet[I]`.
    +In our case `T=MeanTransformer` and `I=DenseVector`.
    +The parameter map is necessary if our fit step depends on some parameter 
values which were not given directly at creation time of the `Transformer`.
    +The `FitOperation` of the `MeanTransformer` sums the `DenseVector` 
instances of the given input data set up and divides the result by the total 
number of vectors.
    +That way, we obtain a `DataSet[DenseVector]` with a single element which 
is the mean value.
    +
    +But if we look closely at the implementation, we see that the result of 
the mean computation is never stored anywhere.
    +If we want to use this knowledge in a later step to adjust the mean of 
some other input, we have to keep it around.
    +And here is where the parameter of type `MeanTransformer` which is given 
to the `fit` method comes into play.
    +We can use this instance to store state, which is used by a subsequent 
`transform` operation which works on the same object.
    +But first we have to extend `MeanTransformer` by a member field and then 
adjust the `FitOperation` implementation.
    +
    +{% highlight scala %}
    +class MeanTransformer extends Transformer[Centering] {
    +  var meanOption: Option[DataSet[DenseVector]] = None
    +
    +  def setMean(mean: Double): Mean = {
    +    parameters.add(MeanTransformer.Mean, mu)
    +  }
    +}
    +
    +val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, 
DenseVector] {
    +  override def fit(instance: MeanTransformer, fitParameters: ParameterMap, 
input: DataSet[DenseVector]) : Unit = {
    +    import org.apache.flink.ml.math.Breeze._
    +    
    +    instance.meanOption = Some(input
    +      .map{ x => (x.asBreeze, 1) }
    +      .reduce{
    +        (left, right) => 
    +          (left._1 + right._1, left._2 + right._2) 
    +      }
    +      .map{ p => (p._1/p._2).fromBreeze })
    +  }
    +}
    +{% endhighlight %}
    +
    +If we look at the `transform` method in `Transformer`, we will see that we 
also need an implementation of `TransformOperation`.
    +A possible mean transforming implementation could look the following.
    --- End diff --
    
    :+1:


> Add contribution guide for FlinkML
> ----------------------------------
>
>                 Key: FLINK-2073
>                 URL: https://issues.apache.org/jira/browse/FLINK-2073
>             Project: Flink
>          Issue Type: New Feature
>          Components: Documentation, Machine Learning Library
>            Reporter: Theodore Vasiloudis
>            Assignee: Till Rohrmann
>             Fix For: 0.9
>
>
> We need a guide for contributions to FlinkML in order to encourage the 
> extension of the library, and provide guidelines for developers.
> One thing that should be included is a step-by-step guide to create a 
> transformer, or other Estimator



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to