[ https://issues.apache.org/jira/browse/FLINK-1869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501320#comment-14501320 ]
Gyula Fora commented on FLINK-1869: ----------------------------------- Hey, The purpose of operator chaining is to make it possible for the optimizer layer to collocate the discretizer operator with the preceeding operator. For instance if you have a source with parallelism 2 and a discretizer with the same parallelism (and no data shuffling between) the source would directly pass the output to the discretizer. This allows greately improved throughput in these cases. You are right, the only thing we need to do here is make the Discretizer classes extend the ChainableStreamOperator which implements the Collector interface. This allows the discretizer to receive inputs through the collect method. You should override the default implementation of the collect to make it work properly (or refactor the disrectizer methods to make it work with the default collect implementation, that would be nicer). The WindowIntegrationTest actually fails if you just change the discretizer to extend the chainable. But you should also confirm the behaviour by modifying some of the discretizer tests to receive the inputs by collect. Gyula > Make StreamDiscretizers chainable > --------------------------------- > > Key: FLINK-1869 > URL: https://issues.apache.org/jira/browse/FLINK-1869 > Project: Flink > Issue Type: Improvement > Components: Streaming > Reporter: Gyula Fora > > Currently the different StreamDiscretizer operators (StreamDiscretizer, > GroupedStreamDiscretizer, GroupedActiveDiscretizer) are non-chainable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)