fhueske commented on code in PR #28199:
URL: https://github.com/apache/flink/pull/28199#discussion_r3274542944


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -215,6 +215,37 @@ public class ToChangelogTestPrograms {
                                     + "op_mapping => 
MAP['INSERT,UPDATE_AFTER', 'C', 'DELETE', 'D'])")
                     .build();
 
+    public static final TableTestProgram UPSERT_PARTITION_BY_KEY_ONLY_DELETES =
+            TableTestProgram.of(
+                            
"to-changelog-upsert-partition-by-key-only-deletes",
+                            "PARTITION BY upsert key + mapping without 
UB/DELETE handles key-only deletes")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert(true))
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: source only 
knows the key.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[Alice, U, 10]",
+                                            "+I[Bob, U, 20]",
+                                            "+I[Alice, U, 30]")

Review Comment:
   Ah, now I understand the implications of not having a `DELETE` in the 
`op_mapping`. 
   I thought we would still forward the partial delete, but we are not (it just 
ensures that we don't waste state on tracking deletes).
   
   Curious, is there a way to forward partial deletes?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -68,4 +74,58 @@ static TraitCondition not(final TraitCondition condition) {
         return new BuiltInCondition(
                 BuiltInCondition.Kind.NOT, List.of(condition), ctx -> 
!condition.test(ctx));
     }
+
+    /** True when either {@code left} or {@code right} evaluates to true. */
+    static TraitCondition or(final TraitCondition left, final TraitCondition 
right) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.OR,
+                List.of(left, right),
+                ctx -> left.test(ctx) || right.test(ctx));
+    }
+
+    /** True when the named scalar argument was provided by the caller. */
+    static TraitCondition argIsPresent(final String argName) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.ARG_IS_PRESENT,
+                List.of(argName),
+                ctx -> ctx.hasScalarArgument(argName));
+    }
+
+    /**
+     * True when the named scalar argument is present and its value matches 
{@code predicate}. False
+     * when the argument is absent or cannot be resolved as a literal of 
{@code argClass}.
+     *
+     * <p>Use this for ad-hoc conditions on scalar literals. Prefer the named 
factories above when
+     * one fits.
+     */
+    static <X> TraitCondition argMatches(
+            final String argName, final Class<X> argClass, final Predicate<X> 
predicate) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.ARG_MATCHES,
+                List.of(argName, argClass, predicate),
+                ctx -> ctx.getScalarArgument(argName, 
argClass).stream().anyMatch(predicate));
+    }
+
+    /**
+     * True when the named {@code MAP<STRING, STRING>} scalar argument is 
present and contains
+     * {@code key} among its keys. False when the argument is absent or cannot 
be resolved as a
+     * literal {@link Map}.
+     *
+     * <p>Also matches compound keys: if a key contains commas (e.g. {@code 
"INSERT,UPDATE_AFTER"}),
+     * each comma-separated part is trimmed and compared against {@code key} - 
useful for mappings
+     * where one entry covers multiple kinds.
+     */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    static TraitCondition mapArgIncludesKey(final String argName, final String 
key) {
+        return argMatches(
+                argName, Map.class, map -> mapKeysContain((Map<String, 
String>) map, key));
+    }

Review Comment:
   Not sure if we should add this condition method.
   The hard cast to `Map<String,String>` and the custom key check with 
`split()` and `trim()` is specific for `TO_CHANGELOG` and not very generic, IMO.
   
   If we would move `mapKeysContain()` as a helper to 
`BuildInFunctionDefinitions` (or some other (utils) class and make it `public 
static`), we also wouldn't need the `or` and `not` construct in the 
`withConditionalTrait()` method because `argMatches()` checks for arg presence. 
 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java:
##########
@@ -68,4 +74,58 @@ static TraitCondition not(final TraitCondition condition) {
         return new BuiltInCondition(
                 BuiltInCondition.Kind.NOT, List.of(condition), ctx -> 
!condition.test(ctx));
     }
+
+    /** True when either {@code left} or {@code right} evaluates to true. */
+    static TraitCondition or(final TraitCondition left, final TraitCondition 
right) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.OR,
+                List.of(left, right),
+                ctx -> left.test(ctx) || right.test(ctx));
+    }
+
+    /** True when the named scalar argument was provided by the caller. */
+    static TraitCondition argIsPresent(final String argName) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.ARG_IS_PRESENT,
+                List.of(argName),
+                ctx -> ctx.hasScalarArgument(argName));
+    }
+
+    /**
+     * True when the named scalar argument is present and its value matches 
{@code predicate}. False
+     * when the argument is absent or cannot be resolved as a literal of 
{@code argClass}.

Review Comment:
   nit
   ```suggestion
        * when the argument is absent, cannot be resolved as a literal of 
{@code argClass}, or {@code predicate} evaluates to `false`.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to