[
https://issues.apache.org/jira/browse/FLINK-87?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14076251#comment-14076251
]
Stephan Ewen commented on FLINK-87:
-----------------------------------
That is a nice idea.
Can you specify a bit more how the selector function should look like?
When thinking how to implement this (also in the batch API) I only found a good
way when the naming and functions were reveres in the order.
{code}
SplitDataStream<Long> split = env.generateSequence(1, 6).directTo(new
MySelector()); // maybe call it "split" instead of "direct to"
DataStream<Long> even = split.select("even");
even.map(new PlusTwo()).addSink(new EvenSink());
// or concise
split.select("odd").map(new PlusTwo()).addSink(newOddSink());
{code}
This wires the part for the even numbers to the output stream that the selector
feeds when it returns "even" (and symmetrically for "odd").
> Extend collectors to specify target sink
> ----------------------------------------
>
> Key: FLINK-87
> URL: https://issues.apache.org/jira/browse/FLINK-87
> Project: Flink
> Issue Type: Improvement
> Components: Java API
> Reporter: Robert Metzger
> Labels: github-import
> Fix For: pre-apache
>
>
> This is an enhancement proposal for Stratosphere.
> It is possible to have multiple outputs for a given PACT, like the figure
> below illustrates:
> (SRC)
> |
> REDUCE
> / | \
> +-----+ | +-------+
> | | |
> (SINK A) | (SINK C)
> (SINK B)
> All records are going to all sinks.
> It would be preferable sometimes to select a sink for a record .. For example
> each group into a separate file (yes, I know, one could add a filter before
> each sink)
> We could add another collect() method like this
> public void collect(int sinkId, PactRecord record)
> The current collect() is quite simple:
> public void collect(PactRecord record)
> {
> for (int i = 0; i < writers.length; i++) {
> this.writers[i].emit(record);
> }
> }
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/87
> Created by: [rmetzger|https://github.com/rmetzger]
> Labels: enhancement,
> Assignee: [rmetzger|https://github.com/rmetzger]
> Created at: Wed Sep 11 17:49:47 CEST 2013
> State: open
--
This message was sent by Atlassian JIRA
(v6.2#6252)