[
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14231760#comment-14231760
]
Ufuk Celebi commented on FLINK-1297:
------------------------------------
It may make sense to collect these numbers on the {{RecordWriter}} level (1),
which is responsible for serializing data into network buffers and routing them
to the respective network output channels. The user-facing {{Collector}}
implementations are wrappers around one or multiple {{RecordWriter}} instances.
At this level you would have access to the number of emitted records (total and
split by channels (target partitioning groups)) and the size of each record
(total and split by channels).
The interface is pretty straight-forward: for each data element (historically
called a record) the {{emit}} method is called. This method checks to which of
the available output channels the element should be shipped. Selection is done
by calling the {{ChannelSelector.selectChannels(T, int)}} method, which returns
an array of channel indexes. The record is then serialized into the respective
buffers of the channels.
If you think that this approach is reasonable for you and you need more
details, we can also have a Skype call tomorrow. :)
A shortcoming at this layer is that you only deal with {{IOReadableWriteable}},
which might be a limiting factor.
(1)
https://github.com/apache/incubator-flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java
> Add support for tracking statistics of intermediate results
> -----------------------------------------------------------
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Runtime
> Reporter: Alexander Alexandrov
> Assignee: Alexander Alexandrov
> Fix For: 0.8-incubating
>
> Original Estimate: 1,008h
> Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the
> runtime code with a statistics facility that collects the required
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic
> statistics for the (intermediate) result of dataflows (e.g. min, max, count,
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have
> some sort of detailed sketch about the key distribution of an intermediate
> result. I am not sure whether a simple histogram is the most effective way to
> go. Maybe somebody would propose another lightweight sketch that provides
> better accuracy.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)