[ 
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836749#comment-17836749
 ] 

Balint Bene commented on FLINK-33989:
-------------------------------------

Out of curiosity for my own understand, is there an advantage emitting 
tombstones on the update_before as opposed to doing nothing? I'm looking at a 
use case where the sources are Debezium format and we're trying to emit well 
designed events from it. The tombstones are just extra events to filter out and 
drop unless I'm missing something.

It seems like the [KafkaWriter will do 
nothing|https://github.com/apache/flink-connector-kafka/blob/369e7be46a70fd50d68746498aed82105741e7d6/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L194]
 if the serialization step returns null instead of a tombstone. 

> Insert Statement With Filter Operation Generates Extra Tombstone using Upsert 
> Kafka Connector
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33989
>                 URL: https://issues.apache.org/jira/browse/FLINK-33989
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / Runtime
>    Affects Versions: 1.17.2
>            Reporter: Flaviu Cicio
>            Priority: Major
>
> Given the following Flink SQL tables:
> {code:sql}
> CREATE TABLE input (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 
>   'topic' = 'input', 
>   'key.format' = 'raw', 
>   'properties.bootstrap.servers' = 'kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> );
> CREATE TABLE output (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 
>   'topic' = 'output', 
>   'key.format' = 'raw', 
>   'properties.bootstrap.servers' = 'kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> ); {code}
> And, the following entries are present in the input Kafka topic:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> If we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input; {code}
> The following entries are published to the output Kafka topic:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> But, if we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); 
> {code}
> The following entries are published:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   null,
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> We would expect the result to be the same for both insert statements.
> As we can see, there is an extra tombstone generated as a result of the 
> second statement.
>  
> Moreover, if we make a select on the input table:
> {code:sql}
> SELECT * FROM input;
> {code}
> We will get the following entries:
> ||op||id||current_value||
> |I|1|abc|
> |-U|1|abc|
> |+U|1|abcd|
> We expected to see only the insert and the update_after entries.
> The update_before is added at DeduplicateFunctionHelper#122.
> This is easily reproducible with this test that we added in the 
> UpsertKafkaTableITCase from flink-connector-kafka:
> {code:java}
>     @Test
>     public void testAggregateFilterOmit() throws Exception {
>         String topic = COUNT_FILTER_TOPIC + "_" + format;
>         createTestTopic(topic, 1, 1);
>         env.setParallelism(1);
>         // -------------   test   ---------------
>         countFilterToUpsertKafkaOmitUpdateBefore(topic);
>         // ------------- clean up ---------------
>         deleteTestTopic(topic);
>     }
>     private void countFilterToUpsertKafkaOmitUpdateBefore(String table) 
> throws Exception {
>         String bootstraps = getBootstrapServers();
>         List<Row> data =
>                 Arrays.asList(
>                         Row.of(1, "Hi"),
>                         Row.of(1, "Hello"),
>                         Row.of(2, "Hello world"),
>                         Row.of(2, "Hello world, how are you?"),
>                         Row.of(2, "I am fine."),
>                         Row.of(3, "Luke Skywalker"),
>                         Row.of(3, "Comment#1"),
>                         Row.of(3, "Comment#2"),
>                         Row.of(4, "Comment#3"),
>                         Row.of(4, null));
>         final String createSource =
>                 String.format(
>                         "CREATE TABLE aggfilter_%s ("
>                                 + "  `id` INT,\n"
>                                 + "  `comment` STRING\n"
>                                 + ") WITH ("
>                                 + "  'connector' = 'values',"
>                                 + "  'data-id' = '%s'"
>                                 + ")",
>                         format, TestValuesTableFactory.registerData(data));
>         tEnv.executeSql(createSource);
>         final String createSinkTable =
>                 String.format(
>                         "CREATE TABLE %s (\n"
>                                 + "  `id` INT,\n"
>                                 + "  `comment` STRING,\n"
>                                 + "  PRIMARY KEY (`id`) NOT ENFORCED\n"
>                                 + ") WITH (\n"
>                                 + "  'connector' = 'upsert-kafka',\n"
>                                 + "  'topic' = '%s',\n"
>                                 + "  'properties.bootstrap.servers' = '%s',\n"
>                                 + "  'key.format' = '%s',\n"
>                                 + "  'value.format' = '%s'"
>                                 //+ "  'sink.omit-row-kind' = '-U'"
>                                 + ")",
>                         table, table, bootstraps, format, format);
>         tEnv.executeSql(createSinkTable);
>         String initialValues =
>                 "INSERT INTO "
>                         + table
>                         + " "
>                         + "SELECT * "
>                         + "FROM aggfilter_"
>                         + format
>                         + " "
>                         + "WHERE id > 2";
>         tEnv.executeSql(initialValues).await();
>         // ---------- read from the upsert sink -------------------
>         final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM " + 
> table), 3);
>         List<String> actual = 
> TestValuesTableFactory.getResults(String.format("%s", table));
>         List<Row> expected =
>                 Arrays.asList(
>                         changelogRow("+I", 1L, 1L),
>                         changelogRow("+I", 2L, 1L),
>                         changelogRow("-D", 2L, 1L),
>                         changelogRow("+I", 2L, 2L),
>                         changelogRow("+I", 3L, 1L),
>                         changelogRow("-D", 3L, 1L),
>                         changelogRow("+I", 3L, 2L),
>                         changelogRow("-D", 3L, 2L),
>                         changelogRow("+I", 3L, 3L),
>                         changelogRow("+I", 4L, 1L),
>                         changelogRow("-D", 4L, 1L),
>                         changelogRow("+I", 4L, 2L),
>                         changelogRow("-D", 4L, 2L),
>                         changelogRow("+I", 4L, 3L),
>                         changelogRow("-D", 4L, 3L));
>         assertThat(result).satisfies(matching(deepEqualTo(expected, true)));
>     }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to