[ 
https://issues.apache.org/jira/browse/FLINK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18060735#comment-18060735
 ] 

Salva commented on FLINK-39131:
-------------------------------

An alternative would be to support side inputs.

> Multi-Input Processors
> ----------------------
>
>                 Key: FLINK-39131
>                 URL: https://issues.apache.org/jira/browse/FLINK-39131
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream
>            Reporter: Salva
>            Priority: Major
>
> As a DataStream API user, I would like to have a built-in primitive that 
> allows me to process multiple (N>2) inputs within a single operator, that is, 
> going beyond the current co-processors. This would not only dramatically 
> improve the user ergonomics when writing multi-way joins—saving the tedious 
> of having to manually union/multiplex all the inputs using a custom tuple—but 
> also help prevent the state-explosion issue recently handled in 
> [FLIP-516|https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator]
>  for SQL.
> The Operator API already contains all the necessary building blocks, and some 
> users have reported successfully leveraging them, e.g., see the replies in 
> this 
> [thread|https://lists.apache.org/thread/mt4pb9z55d4p5m0wdwrsoxnpcc5hon0n].
> I'm personally doing this too, so I wonder whether something like this could 
> be officially supported—or at least documented.
> The basic idea—providing something that feels like a 
> {{{}(Keyed)MultiProcessFunction{}}}—can be illustrated with the following 
> test:
> {code:java}
> public class MultiInputITCase {
>   StreamExecutionEnvironment env;
>   DataStream<X> xs;
>   DataStream<Y> ys;
>   DataStream<Z> zs;
>   @Before
>   public void setup() {
>     env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(2);
>     xs = env.fromData(new X("a", 10), new X("b", 100));
>     ys = env.fromData(new Y("a", 20), new Y("b", 200));
>     zs = env.fromData(new Z("a", 30), new Z("b", 300));
>   }
>   @Test
>   public void testKeyedThreeWayJoin() throws Exception {
>     KeyedMultiInputOperatorBuilder<String, Out> builder =
>         new KeyedMultiInputOperatorBuilder<>(
>             env,
>             KeyedThreeInputOperator.class,
>             TypeInformation.of(Out.class),
>             Types.STRING
>         );
>     builder
>         .addInput(xs, X::getKey)
>         .addInput(ys, Y::getKey)
>         .addInput(zs, Z::getKey);
>     DataStream<Out> joined = builder.build("xyz-join");
>     TestListResultSink<Out> resultSink = new TestListResultSink<>();
>     joined.addSink(resultSink);
>     env.execute("Keyed Three-Way Join Test");
>     List<Out> result = resultSink.getResult();
>     result.sort(Comparator.comparing(Out::getId).thenComparing(Out::getSum));
>     assertEquals(2, result.size());
>     assertEquals(new Out("a", 60), result.get(0));
>     assertEquals(new Out("b", 600), result.get(1));
>   }
> } {code}
> with the user-defined processor looking like this:
> {code:java}
> /**
>  * Example of three-way keyed join:
>  * <p>
>  * X(id, x), Y(id, y), Z(id, z)  ->  Out(id, x + y + z)
>  * <p>
>  * Uses the KeyedMultiInputOperator3 base to get CoProcess-style handlers:
>  * processElement1(X value, Context ctx, Collector<Out> out)
>  * processElement2(Y value, Context ctx, Collector<Out> out)
>  * processElement3(Z value, Context ctx, Collector<Out> out)
>  */
> public class KeyedThreeInputOperator
>     extends KeyedMultiInputOperator3<X, Y, Z, Out> {
>   private transient ValueState<Integer> lastX;
>   private transient ValueState<Integer> lastY;
>   private transient ValueState<Integer> lastZ;
>   public KeyedThreeInputOperator(StreamOperatorParameters<Out> params) {
>     super(params);
>   }
>   @Override
>   public void open() throws Exception {
>     super.open();
>     var store = getKeyedStateStore()
>         .orElseThrow(() -> new IllegalStateException("MultiJoinOperator 
> requires keyed state"));
>     lastX = store.getState(new ValueStateDescriptor<>("x", Types.INT));
>     lastY = store.getState(new ValueStateDescriptor<>("y", Types.INT));
>     lastZ = store.getState(new ValueStateDescriptor<>("z", Types.INT));
>   }
>   //
>   // Input channel callbacks
>   //
>   @Override
>   protected void processElement1(X x, Context ctx, Collector<Out> out) throws 
> Exception {
>     lastX.update(x.getX());
>     join(ctx, out);
>   }
>   @Override
>   protected void processElement2(Y y, Context ctx, Collector<Out> out) throws 
> Exception {
>     lastY.update(y.getY());
>     join(ctx, out);
>   }
>   @Override
>   protected void processElement3(Z z, Context ctx, Collector<Out> out) throws 
> Exception {
>     lastZ.update(z.getZ());
>     join(ctx, out);
>   }
>   //
>   // Join logic (take the sum of the 3 current values for each key)
>   //
>   private void join(Context ctx, Collector<Out> out) throws Exception {
>     Integer a = lastX.value();
>     Integer b = lastY.value();
>     Integer c = lastZ.value();
>     if (a != null && b != null && c != null) {
>       String key = ctx.getCurrentKey(String.class);
>       out.collect(new Out(key, a + b + c));
>     }
>   }
> } {code}
> This is the simplest (N=3) case, but in my library I have generated up to 
> N=25 (KeyedMultiInputOperator3...KeyedMultiInputOperator25) along the lines 
> of what's done for 
> [Tuples|https://github.com/apache/flink/tree/master/flink-core-api/src/main/java/org/apache/flink/api/java/tuple].
> I can provide more details but that should be enough for the sake of 
> discussion/triaging. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to