[ 
https://issues.apache.org/jira/browse/FLINK-1869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501320#comment-14501320
 ] 

Gyula Fora commented on FLINK-1869:
-----------------------------------

Hey,

The purpose of operator chaining is to make it possible for the optimizer layer 
to collocate the discretizer operator with the preceeding operator. For 
instance if you have a source with parallelism 2 and a discretizer with the 
same parallelism  (and no data shuffling between) the source would directly 
pass the output to the discretizer. This allows greately improved throughput in 
these cases.

You are right, the only thing we need to do here is make the Discretizer 
classes extend the ChainableStreamOperator which implements the Collector 
interface. This allows the discretizer to receive inputs through the collect 
method. You should override the default implementation of the collect to make 
it work properly (or refactor the disrectizer methods to make it work with the 
default collect implementation, that would be nicer).

The WindowIntegrationTest actually fails if you just change the discretizer to 
extend the chainable. But you should also confirm the behaviour by modifying 
some of the discretizer tests to receive the inputs by collect.

Gyula



> Make StreamDiscretizers chainable
> ---------------------------------
>
>                 Key: FLINK-1869
>                 URL: https://issues.apache.org/jira/browse/FLINK-1869
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>            Reporter: Gyula Fora
>
> Currently the different StreamDiscretizer operators (StreamDiscretizer, 
> GroupedStreamDiscretizer, GroupedActiveDiscretizer) are non-chainable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to