Hi everyone,

sorry for being so late with this request, but fixing a couple of down stream bugs had higher priority than this issue and were also blocking it. Nevertheless, I would like to ask for permission to merge the FLINK-19980[1] to the 1.13 branch as an experimental feature to add the API methods `StreamTableEnvironment.fromChangelogStream` and `StreamTableEnvironment.toChangelogStream`.


It enables a smooth integration for DataStream<->Table API like:

DataStream<Row> dataStream =
        env.fromElements(
                Row.ofKind(RowKind.INSERT, "alice", 12),
                Row.ofKind(RowKind.UPDATE_AFTER, "alice", 13),
                Row.ofKind(RowKind.UPDATE_AFTER, "alice", 14));
tEnv
    .fromChangelogStream(
        dataStream,
        Schema.newBuilder().primaryKey("f0").build(),
        ChangelogMode.upsert()
    )
    .groupBy($("f0"))
    .select($("f0"), $("f1").sum())
    .execute()
    .print();


Reasons for this are:

- It doesn't really add a new feature but only exposes functionality that was merged before the feature freeze.

- Most of the changes are API-related checks and docs.

- It does not affect existing tests in any way. It is a completely separate feature in a parallel stack of method calls and classes.

- It gives a lot of value to the user as it allows to work with changelog streams easily. A long requested functionality.

- It allows users to play around with Flink's CDC engine without the need of a connector (big blocker for adoption at the moment).

- It helps in advertising Flink as a CDC engine.

- It helps us gathering feedback and stabilize the CDC engine. Maybe issues like FLINK-20374[2] would have been discovered earlier.

- And most importantly: It gives the last old API method (StreamTableEnvironment.toRetractStream) an alternative in the new type system which means we could think about dropping it sooner.

I don't know if there will be another RC, but this is the case, I would strongly vote for including this issue if it has been accepted in the review. Otherwise we could also discuss including it in Flink 1.13.1.

Happy to hear your opinions.

Regards,
Timo


[1] https://issues.apache.org/jira/browse/FLINK-19980
[2] https://issues.apache.org/jira/browse/FLINK-20374

Reply via email to