gustavodemorais commented on code in PR #28025:
URL: https://github.com/apache/flink/pull/28025#discussion_r3195608760
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -158,21 +159,21 @@ private static Optional<List<DataType>> validateOpColumn(
final TableSemantics tableSemantics =
callContext.getTableSemantics(ARG_TABLE).get();
final String opColumnName = resolveOpColumnName(callContext);
final List<Field> inputFields =
DataType.getFields(tableSemantics.dataType());
- final Optional<Field> opField =
- inputFields.stream().filter(f ->
f.getName().equals(opColumnName)).findFirst();
- if (opField.isEmpty()) {
+ final Integer opIndex =
buildFieldNameToIndex(inputFields).get(opColumnName);
Review Comment:
nit: maybe build buildFieldNameToIndex once and reuse it?
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -61,7 +61,7 @@ SELECT * FROM FROM_CHANGELOG(
| Parameter | Required | Description
|
|:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `input` | Yes | The input table. Must be append-only.
|
+| `input` | Yes | The input table. Must be append-only. Use
`PARTITION BY` to ensure rows for the same key are processed together. This is
required when downstream operators are keyed on that column.
|
Review Comment:
```suggestion
| `input` | Yes | The input table. Must be append-only. Use
`PARTITION BY` to ensure rows for the same key are processed together.
|
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -158,21 +159,21 @@ private static Optional<List<DataType>> validateOpColumn(
final TableSemantics tableSemantics =
callContext.getTableSemantics(ARG_TABLE).get();
final String opColumnName = resolveOpColumnName(callContext);
final List<Field> inputFields =
DataType.getFields(tableSemantics.dataType());
- final Optional<Field> opField =
- inputFields.stream().filter(f ->
f.getName().equals(opColumnName)).findFirst();
- if (opField.isEmpty()) {
+ final Integer opIndex =
buildFieldNameToIndex(inputFields).get(opColumnName);
+ if (opIndex == null) {
return callContext.fail(
throwOnFailure,
String.format(
"The op column '%s' does not exist in the input
schema.",
opColumnName));
}
- if
(!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING))
{
+ final Field opField = inputFields.get(opIndex);
+ if
(!opField.getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING))
{
Review Comment:
Add a "LogicalType opFieldType = opField.getDataType().getLogicalType()" and
reuse it here and below
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -172,6 +172,71 @@ public class FromChangelogTestPrograms {
+ "op => DESCRIPTOR(operation))")
.build();
+ //
--------------------------------------------------------------------------------------------
+ // Set semantics with PARTITION BY
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Verifies that {@code FROM_CHANGELOG(TABLE t PARTITION BY id)} produces
the same logical
+ * output as the row-semantic call. The conditional {@code
SET_SEMANTIC_TABLE} trait switches
+ * the execution to a co-located parallel mode but must not change
row-level semantics.
+ */
Review Comment:
I think you can just remove this comment, the base tests are self explanatory
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -172,6 +172,71 @@ public class FromChangelogTestPrograms {
+ "op => DESCRIPTOR(operation))")
.build();
+ //
--------------------------------------------------------------------------------------------
+ // Set semantics with PARTITION BY
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Verifies that {@code FROM_CHANGELOG(TABLE t PARTITION BY id)} produces
the same logical
+ * output as the row-semantic call. The conditional {@code
SET_SEMANTIC_TABLE} trait switches
+ * the execution to a co-located parallel mode but must not change
row-level semantics.
+ */
+ public static final TableTestProgram SET_SEMANTICS_PARTITION_BY =
Review Comment:
```suggestion
public static final TableTestProgram RETRACT_PARTITION_BY =
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -127,6 +127,14 @@ SELECT * FROM FROM_CHANGELOG(
-- The operation column named 'operation' is used instead of 'op'
```
+#### Partitioning by a key
+
Review Comment:
We need to add more info to inform the user when he should use it
``` Prefer row semantics: this should be only necessary if your rows for the
same key were spread across partitions. In this case, consider having ORDER BY
[link] to also fix ordering.```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -127,6 +127,14 @@ SELECT * FROM FROM_CHANGELOG(
-- The operation column named 'operation' is used instead of 'op'
```
+#### Partitioning by a key
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream PARTITION BY id
+)
+```
+
Review Comment:
Make it clear here that this is the only way to preserve a primary key.
Sometihng like
"If you're producing an upsert table, that means, if you are producing
UPDATE_AFTER but no UPDATE_BEFORE from your cdc input stream: the partition key
you select here will be considered your primary key and your upsert key by the
engine. So please make sure the partition by key matches exactly your primary
key."
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -273,23 +274,54 @@ private static Optional<List<DataType>>
validateErrorHandling(
return Optional.empty();
}
- private static String resolveOpColumnName(final CallContext callContext) {
+ /**
+ * Resolves the op column name from the {@code op} descriptor argument,
falling back to {@link
+ * #DEFAULT_OP_COLUMN_NAME} when the argument is omitted or empty.
+ */
+ public static String resolveOpColumnName(final CallContext callContext) {
return callContext
.getArgumentValue(ARG_OP, ColumnList.class)
.filter(cl -> !cl.getNames().isEmpty())
.map(cl -> cl.getNames().get(0))
.orElse(DEFAULT_OP_COLUMN_NAME);
}
- private static List<Field> buildOutputFields(
+ /**
+ * Computes the indices of input columns that pass through to the
function's output. Excludes
+ * the op column (becomes RowKind) and partition key columns (which the
framework prepends
Review Comment:
```suggestion
* the op column (becomes RowKind) and partition key columns, if present
(which the framework prepends
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java:
##########
@@ -75,4 +75,18 @@ void testCustomOpMapping() {
+ "error_handling => 'SKIP')",
CHANGELOG_MODE);
}
+
+ @Test
+ void testSetSemanticsWithPartitionBy() {
Review Comment:
```suggestion
void testRetractPartitionBy() {
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java:
##########
@@ -75,4 +75,18 @@ void testCustomOpMapping() {
+ "error_handling => 'SKIP')",
CHANGELOG_MODE);
}
+
+ @Test
Review Comment:
Could you also add the upsert keys to the output plan for all the
FromChangelogTest? I think it's useful to track this here so see where we keep
it and eventually track regressions or change in the future
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -172,6 +172,71 @@ public class FromChangelogTestPrograms {
+ "op => DESCRIPTOR(operation))")
.build();
+ //
--------------------------------------------------------------------------------------------
+ // Set semantics with PARTITION BY
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Verifies that {@code FROM_CHANGELOG(TABLE t PARTITION BY id)} produces
the same logical
+ * output as the row-semantic call. The conditional {@code
SET_SEMANTIC_TABLE} trait switches
+ * the execution to a co-located parallel mode but must not change
row-level semantics.
+ */
+ public static final TableTestProgram SET_SEMANTICS_PARTITION_BY =
Review Comment:
We can name the default cases like this
```
INSERT
RETRACT
UPSERT
RETRACT_PARTITION_BY
UPSERT_PARTITION_BY
...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]