gustavodemorais commented on code in PR #27901:
URL: https://github.com/apache/flink/pull/27901#discussion_r3063761339
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -75,26 +76,30 @@ The output columns are ordered as:
[partition_key_columns, remaining_columns_without_op]
```
-The op column is removed from the output. Output rows carry the appropriate
`RowKind` (INSERT, UPDATE_AFTER, or DELETE).
+The op column is removed from the output. Output rows carry the appropriate
change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).
### Examples
#### Basic usage with standard op names
```sql
-- Input (append-only):
--- +I[id:1, op:'INSERT', name:'Alice']
--- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
--- +I[id:2, op:'DELETE', name:'Bob']
+-- +I[id:1, op:'INSERT', name:'Alice']
+-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
+-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
+-- +I[id:2, op:'DELETE', name:'Bob']
SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream PARTITION BY id
)
--- Output (upsert stream):
+-- Output (updating table):
-- +I[id:1, name:'Alice']
+-- -U[id:1, name:'Alice']
-- +U[id:1, name:'Alice2']
-- -D[id:2, name:'Bob']
+
+-- Table state after all events: {id:1, name:'Alice2'}
Review Comment:
nit: this looks more like json. Can you use a table format?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -1705,18 +1705,27 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
defaultTraitSet: T): T = {
val call = process.getCall
val definition = ShortcutUtils.unwrapFunctionDefinition(call)
- val changelogFunctionOpt: Option[ChangelogFunction] = definition match {
- case cf: ChangelogFunction => Some(cf)
- case builtIn: BuiltInFunctionDefinition if
builtIn.getChangelogFunction.isPresent =>
- Some(builtIn.getChangelogFunction.get())
- case _ => None
- }
- changelogFunctionOpt match {
- case Some(changelogFunction) =>
- val inputChangelogModes = children.map(toChangelogMode(_, None, None))
+ val inputChangelogModes = children.map(toChangelogMode(_, None, None))
+ val changelogModeOpt: Option[ChangelogMode] = definition match {
+ // User-defined PTFs that implement ChangelogFunction
+ case cf: ChangelogFunction =>
Review Comment:
_Ideally_ we don't touch this file. If you implement ChangelogFunction that
might be enough and we can then skip these changes
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java:
##########
Review Comment:
Can't we implement "ChangelogFunction" and then have a default
implementation keep the current behavior and then just change this for our new
builtin function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]