gustavodemorais commented on code in PR #28025:
URL: https://github.com/apache/flink/pull/28025#discussion_r3195608760


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -158,21 +159,21 @@ private static Optional<List<DataType>> validateOpColumn(
         final TableSemantics tableSemantics = 
callContext.getTableSemantics(ARG_TABLE).get();
         final String opColumnName = resolveOpColumnName(callContext);
         final List<Field> inputFields = 
DataType.getFields(tableSemantics.dataType());
-        final Optional<Field> opField =
-                inputFields.stream().filter(f -> 
f.getName().equals(opColumnName)).findFirst();
-        if (opField.isEmpty()) {
+        final Integer opIndex = 
buildFieldNameToIndex(inputFields).get(opColumnName);

Review Comment:
   nit: maybe build buildFieldNameToIndex once and reuse it?



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -61,7 +61,7 @@ SELECT * FROM FROM_CHANGELOG(
 
 | Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                             |
 
|:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `input`      | Yes      | The input table. Must be append-only.              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                             |
+| `input`      | Yes      | The input table. Must be append-only. Use 
`PARTITION BY` to ensure rows for the same key are processed together. This is 
required when downstream operators are keyed on that column.                    
                                                                                
                                                                                
                                                                                
                                      |

Review Comment:
   ```suggestion
   | `input`      | Yes      | The input table. Must be append-only. Use 
`PARTITION BY` to ensure rows for the same key are processed together.          
                                                                                
                                                                                
                                                                                
                                                |
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -158,21 +159,21 @@ private static Optional<List<DataType>> validateOpColumn(
         final TableSemantics tableSemantics = 
callContext.getTableSemantics(ARG_TABLE).get();
         final String opColumnName = resolveOpColumnName(callContext);
         final List<Field> inputFields = 
DataType.getFields(tableSemantics.dataType());
-        final Optional<Field> opField =
-                inputFields.stream().filter(f -> 
f.getName().equals(opColumnName)).findFirst();
-        if (opField.isEmpty()) {
+        final Integer opIndex = 
buildFieldNameToIndex(inputFields).get(opColumnName);
+        if (opIndex == null) {
             return callContext.fail(
                     throwOnFailure,
                     String.format(
                             "The op column '%s' does not exist in the input 
schema.",
                             opColumnName));
         }
-        if 
(!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING))
 {
+        final Field opField = inputFields.get(opIndex);
+        if 
(!opField.getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING))
 {

Review Comment:
   Add a "LogicalType opFieldType = opField.getDataType().getLogicalType()" and 
reuse it here and below



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -172,6 +172,71 @@ public class FromChangelogTestPrograms {
                                     + "op => DESCRIPTOR(operation))")
                     .build();
 
+    // 
--------------------------------------------------------------------------------------------
+    // Set semantics with PARTITION BY
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Verifies that {@code FROM_CHANGELOG(TABLE t PARTITION BY id)} produces 
the same logical
+     * output as the row-semantic call. The conditional {@code 
SET_SEMANTIC_TABLE} trait switches
+     * the execution to a co-located parallel mode but must not change 
row-level semantics.
+     */

Review Comment:
   I think you can just remove this comment, the base tests are self explanatory



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -172,6 +172,71 @@ public class FromChangelogTestPrograms {
                                     + "op => DESCRIPTOR(operation))")
                     .build();
 
+    // 
--------------------------------------------------------------------------------------------
+    // Set semantics with PARTITION BY
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Verifies that {@code FROM_CHANGELOG(TABLE t PARTITION BY id)} produces 
the same logical
+     * output as the row-semantic call. The conditional {@code 
SET_SEMANTIC_TABLE} trait switches
+     * the execution to a co-located parallel mode but must not change 
row-level semantics.
+     */
+    public static final TableTestProgram SET_SEMANTICS_PARTITION_BY =

Review Comment:
   ```suggestion
       public static final TableTestProgram RETRACT_PARTITION_BY =
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -127,6 +127,14 @@ SELECT * FROM FROM_CHANGELOG(
 -- The operation column named 'operation' is used instead of 'op'
 ```
 
+#### Partitioning by a key
+

Review Comment:
   We need to add more info to inform the user when he should use it 
   
   ``` Prefer row semantics: this should be only necessary if your rows for the 
same key were spread across partitions. In this case, consider having ORDER BY 
[link] to also fix ordering.```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -127,6 +127,14 @@ SELECT * FROM FROM_CHANGELOG(
 -- The operation column named 'operation' is used instead of 'op'
 ```
 
+#### Partitioning by a key
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream PARTITION BY id
+)
+```
+

Review Comment:
   Make it clear here that this is the only way to preserve a primary key. 
Sometihng like
   
   "If you're producing an upsert table, that means, if you are producing 
UPDATE_AFTER but no UPDATE_BEFORE from your cdc input stream: the partition key 
you select here will be considered your primary key and your upsert key by the 
engine. So please make sure the partition by key matches exactly your primary 
key."



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -273,23 +274,54 @@ private static Optional<List<DataType>> 
validateErrorHandling(
         return Optional.empty();
     }
 
-    private static String resolveOpColumnName(final CallContext callContext) {
+    /**
+     * Resolves the op column name from the {@code op} descriptor argument, 
falling back to {@link
+     * #DEFAULT_OP_COLUMN_NAME} when the argument is omitted or empty.
+     */
+    public static String resolveOpColumnName(final CallContext callContext) {
         return callContext
                 .getArgumentValue(ARG_OP, ColumnList.class)
                 .filter(cl -> !cl.getNames().isEmpty())
                 .map(cl -> cl.getNames().get(0))
                 .orElse(DEFAULT_OP_COLUMN_NAME);
     }
 
-    private static List<Field> buildOutputFields(
+    /**
+     * Computes the indices of input columns that pass through to the 
function's output. Excludes
+     * the op column (becomes RowKind) and partition key columns (which the 
framework prepends

Review Comment:
   ```suggestion
        * the op column (becomes RowKind) and partition key columns, if present 
(which the framework prepends
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java:
##########
@@ -75,4 +75,18 @@ void testCustomOpMapping() {
                         + "error_handling => 'SKIP')",
                 CHANGELOG_MODE);
     }
+
+    @Test
+    void testSetSemanticsWithPartitionBy() {

Review Comment:
   ```suggestion
       void testRetractPartitionBy() {
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java:
##########
@@ -75,4 +75,18 @@ void testCustomOpMapping() {
                         + "error_handling => 'SKIP')",
                 CHANGELOG_MODE);
     }
+
+    @Test

Review Comment:
   Could you also add the upsert keys to the output plan for all the 
FromChangelogTest? I think it's useful to track this here so see where we keep 
it and eventually track regressions or change in the future



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -172,6 +172,71 @@ public class FromChangelogTestPrograms {
                                     + "op => DESCRIPTOR(operation))")
                     .build();
 
+    // 
--------------------------------------------------------------------------------------------
+    // Set semantics with PARTITION BY
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Verifies that {@code FROM_CHANGELOG(TABLE t PARTITION BY id)} produces 
the same logical
+     * output as the row-semantic call. The conditional {@code 
SET_SEMANTIC_TABLE} trait switches
+     * the execution to a co-located parallel mode but must not change 
row-level semantics.
+     */
+    public static final TableTestProgram SET_SEMANTICS_PARTITION_BY =

Review Comment:
   We can name the default cases like this
   
   ```
   INSERT
   RETRACT
   UPSERT
   RETRACT_PARTITION_BY
   UPSERT_PARTITION_BY
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to