raminqaf commented on code in PR #27901:
URL: https://github.com/apache/flink/pull/27901#discussion_r3058509949


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -172,4 +172,30 @@ public interface PartitionedTable {
      *     columns
      */
     Table toChangelog(Expression... arguments);
+
+    /**
+     * Converts this append-only table with an explicit operation code column 
into a dynamic table
+     * using the built-in {@code FROM_CHANGELOG} process table function.
+     *
+     * <p>Each input row is expected to have a string operation code column 
(default: {@code "op"})
+     * that indicates the change operation (e.g., INSERT, UPDATE_AFTER, 
DELETE). The output table is

Review Comment:
   addressed



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##########
@@ -802,6 +807,47 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
                             
"org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition FROM_CHANGELOG =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("FROM_CHANGELOG")
+                    .kind(PROCESS_TABLE)
+                    .staticArguments(
+                            StaticArgument.table(
+                                    "input",
+                                    Row.class,
+                                    false,
+                                    EnumSet.of(
+                                            StaticArgumentTrait.TABLE,
+                                            
StaticArgumentTrait.SET_SEMANTIC_TABLE)),
+                            StaticArgument.scalar("op", 
DataTypes.DESCRIPTOR(), true),
+                            StaticArgument.scalar(
+                                    "op_mapping",
+                                    DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()),
+                                    true))
+                    .changelogFunction(
+                            new ChangelogFunction() {
+                                @Override
+                                public ChangelogMode getChangelogMode(
+                                        ChangelogContext changelogContext) {
+                                    return ChangelogMode.upsert(false);

Review Comment:
   Please refer too: 
https://github.com/apache/flink/pull/27901#discussion_r3058504538



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to