[ 
https://issues.apache.org/jira/browse/FLINK-33785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794749#comment-17794749
 ] 

Bodong Liu commented on FLINK-33785:
------------------------------------

If this class continues to be used in subsequent development, and this report 
is indeed a BUG, ​​can this issue be assigned to me?

> TableJdbcUpsertOutputFormat could not deal with DELETE record correctly when 
> primary keys were set
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33785
>                 URL: https://issues.apache.org/jira/browse/FLINK-33785
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: jdbc-3.1.1
>         Environment: Flink: 1.17.1
> Jdbc connector: 3.1.1
> Postgresql: 16.1
>            Reporter: Bodong Liu
>            Priority: Major
>         Attachments: image-2023-12-08-22-24-20-295.png, 
> image-2023-12-08-22-24-26-493.png, image-2023-12-08-22-24-58-986.png, 
> image-2023-12-08-22-28-44-948.png, image-2023-12-08-22-38-08-559.png, 
> image-2023-12-08-22-40-35-530.png, image-2023-12-08-22-42-06-566.png
>
>
> h1. Issue Description
> When using jdbc connector to DELETE records in database, I found it CAN NOT 
> delete records correctly.
> h1. Reproduction steps
> The steps are as follows:
>  * Create a table with 5 fields and a pk. DDL in postgres:
>  
> {code:java}
> create table public.fake
> (
>     id       bigint                 not null default 
> nextval('fake_id_seq'::regclass),
>     name     character varying(128) not null,
>     age      integer,
>     location character varying(256),
>     birthday timestamp without time zone     default CURRENT_TIMESTAMP,
>     primary key (id, name)
> );{code}
> !image-2023-12-08-22-24-26-493.png!
>  
>  * Insert some data into the table:
> {code:java}
> INSERT INTO public.fake (id, name, age, location, birthday) VALUES (1, 
> 'Jack', 10, null, '2023-12-08 21:35:46.000000');
> INSERT INTO public.fake (id, name, age, location, birthday) VALUES (2, 
> 'Jerry', 18, 'Fake Location', '2023-12-08 13:36:17.088295');
> INSERT INTO public.fake (id, name, age, location, birthday) VALUES (3, 
> 'John', 20, null, null);
> INSERT INTO public.fake (id, name, age, location, birthday) VALUES (4, 
> 'Marry', null, null, '2023-12-08 13:37:09.721785');
> {code}
> !image-2023-12-08-22-24-58-986.png!
>  * Run the flink code:
> {code:java}
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     final String[] fieldNames = {"id", "name", "age", "location", "birthday"};
>     final int[] fieldTypes = {
>             Types.BIGINT, Types.VARCHAR, Types.INTEGER, Types.VARCHAR, 
> Types.TIMESTAMP
>     };
>     final String[] primaryKeys = {"id", "name"};
>     InternalJdbcConnectionOptions internalJdbcConnectionOptions =
>             InternalJdbcConnectionOptions.builder()
>                     
> .setClassLoader(Thread.currentThread().getContextClassLoader())
>                     .setDriverName(Driver.class.getName())
>                     .setDBUrl("jdbc:postgresql://localhost:5432/postgres")
>                     .setUsername("postgres")
>                     .setPassword("postgres")
>                     .setTableName("fake")
>                     .setParallelism(1)
>                     .setConnectionCheckTimeoutSeconds(10)
>                     .setDialect(new PostgresDialect())
>                     .build();
>     JdbcOutputFormat<Tuple2<Boolean, Row>, Row, 
> JdbcBatchStatementExecutor<Row>> jdbcOutputFormat =
>             JdbcOutputFormat.builder()
>                     .setFieldNames(fieldNames)
>                     .setKeyFields(primaryKeys)
>                     .setFieldTypes(fieldTypes)
>                     .setOptions(internalJdbcConnectionOptions)
>                     .setFlushIntervalMills(1000)
>                     .setFlushMaxSize(10)
>                     .setMaxRetryTimes(3)
>                     .build();
>     GenericJdbcSinkFunction<Tuple2<Boolean, Row>> jdbcSinkFunction =
>             new GenericJdbcSinkFunction<>(jdbcOutputFormat);
>     Timestamp timestamp = Timestamp.valueOf("2023-12-08 21:35:46.000000");
>     // Row to delete
>     Row row = Row.ofKind(RowKind.DELETE, 1L, "Jack", 10, null, timestamp);
>     Tuple2<Boolean, Row> element = Tuple2.of(false, row);
>     
> env.fromCollection(Collections.singleton(element)).addSink(jdbcSinkFunction);
>     env.execute();
> } {code}
> When the code executed successfully, we can see that the record id=1 and 
> name=Jack was not deleted.
> h1. Cause Analysis
> In the build method of JdbcOutputFormat.Builder, if 'keyFields' option was 
> set in the JdbcDmlOptions, the method will return a 
> 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat'.
> !image-2023-12-08-22-28-44-948.png!
> And in 
> 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat#createDeleteExecutor',
>  the method get all the fieldNames instead of keyFields to build the delete 
> sql statement. So the detele sql may not execute correctly.
> !image-2023-12-08-22-38-08-559.png!
> h1. How to fix
>  * Use the real keyFields then fallback to fieldNames to build the executor.
> !image-2023-12-08-22-42-06-566.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to