Hi everyone,
sorry for being so late with this request, but fixing a couple of down
stream bugs had higher priority than this issue and were also blocking
it. Nevertheless, I would like to ask for permission to merge the
FLINK-19980[1] to the 1.13 branch as an experimental feature to add the
API methods `StreamTableEnvironment.fromChangelogStream` and
`StreamTableEnvironment.toChangelogStream`.
It enables a smooth integration for DataStream<->Table API like:
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "alice", 13),
Row.ofKind(RowKind.UPDATE_AFTER, "alice", 14));
tEnv
.fromChangelogStream(
dataStream,
Schema.newBuilder().primaryKey("f0").build(),
ChangelogMode.upsert()
)
.groupBy($("f0"))
.select($("f0"), $("f1").sum())
.execute()
.print();
Reasons for this are:
- It doesn't really add a new feature but only exposes functionality
that was merged before the feature freeze.
- Most of the changes are API-related checks and docs.
- It does not affect existing tests in any way. It is a completely
separate feature in a parallel stack of method calls and classes.
- It gives a lot of value to the user as it allows to work with
changelog streams easily. A long requested functionality.
- It allows users to play around with Flink's CDC engine without the
need of a connector (big blocker for adoption at the moment).
- It helps in advertising Flink as a CDC engine.
- It helps us gathering feedback and stabilize the CDC engine. Maybe
issues like FLINK-20374[2] would have been discovered earlier.
- And most importantly: It gives the last old API method
(StreamTableEnvironment.toRetractStream) an alternative in the new type
system which means we could think about dropping it sooner.
I don't know if there will be another RC, but this is the case, I would
strongly vote for including this issue if it has been accepted in the
review. Otherwise we could also discuss including it in Flink 1.13.1.
Happy to hear your opinions.
Regards,
Timo
[1] https://issues.apache.org/jira/browse/FLINK-19980
[2] https://issues.apache.org/jira/browse/FLINK-20374