Dear all,
A small update on my end regarding the implementation of datasteam window join
in pyflink. Looking around in the code I was thinking we could apply the same
method as done for the key_by method were the JKeyByKeyBySelector class is used
as a java proxy to key by the stream, the user's key_selector is used as in
process function behind the scene to transform the stream as a tuple of (key,
value) and then the KeyBySelector in java is used to extract the field at
position 0 for the key.
Similarly, I defined a "generic" TupleJoinFunction which returns a
Tuple2<first, second> and then applies the user's python JoinFunction on the
output as a process function. Below a code example on how it's done:
```
JKeyByKeySelector = get_gateway().jvm.KeyByKeySelector
JTupleJoinFunction = (
get_gateway().jvm.com.nw.flink.datastream.functions.TupleJoinFunction
)
joined_stream = DataStream(
left.map(
lambda r: __import__("pyflink").common.Row(left_key_selector(r), r),
output_type=Types.ROW([left_key_type, left.get_type()]),
)._j_data_stream
.join(
right.map(
lambda r:
__import__("pyflink").common.Row(right_key_selector(r), r),
output_type=Types.ROW([right_key_type, right.get_type()]),
)._j_data_stream
)
.where(JKeyByKeySelector())
.equalTo(JKeyByKeySelector())
.window(j_window_assigner)
.apply(
JTupleJoinFunction(),
Types.TUPLE([left.get_type(),
right.get_type()]).get_java_type_info(),
)
).map(lambda e: join_function(e[0], e[1]), output_type)
```
and the java implementation of the TupleJoinFunction:
```
public class TupleJoinFunction implements JoinFunction<Row, Row, Tuple2<Row,
Row>> {
@Override
public Tuple2<Row, Row> join(Row first, Row second) throws Exception {
return new Tuple2<Row, Row>((Row) first.getField(1), (Row)
second.getField(1));
}
}
```
The only issue I think with this proposal is that the current python
WindowAssigner implementation does not give access to the underlying java
object. Does someone with more expertise on the pyflink implementation have any
comments on this - this would be much appreciated ?
Again, provided a bit of guidance I am willing to put in the work to open a PR
for the python API to support this feature
Best regards,
Hugo Polsinelli
________________________________
De : Hugo POLSINELLI <[email protected]>
Envoyé : vendredi 28 mars 2025 10:44
À : [email protected] <[email protected]>
Objet : Pyflink improvements proposal
EXTERNAL E-MAIL
Dear all,
I've noticed that there are some missing functionnalities in the Pyflink API
that i would be willing to implement. To name a few:
*
Table API: WindowGroupedTable does not implement the flat_aggregate method
*
Datastream API: No support for WindowJoin and JoinFunction (I saw there is an
umbrella JIRA ticket for improving pyflink Datastream
Window<https://issues.apache.org/jira/browse/FLINK-26477> support but
unfortunately there is no mention of WindowJoins)
*
Execution environment: No support for adding custom Job Listeners
I would be willing to propose a base implementation for each of the following
points above and submit a PR, provided that someone with a bit more experience
than me on the flink project is willing to point me in the right direction !
Feel free to let me know if you would be interested !
Best regards,
Hugo Polsinelli