[ https://issues.apache.org/jira/browse/FLINK-33785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794749#comment-17794749 ]
Bodong Liu commented on FLINK-33785: ------------------------------------ If this class continues to be used in subsequent development, and this report is indeed a BUG, can this issue be assigned to me? > TableJdbcUpsertOutputFormat could not deal with DELETE record correctly when > primary keys were set > -------------------------------------------------------------------------------------------------- > > Key: FLINK-33785 > URL: https://issues.apache.org/jira/browse/FLINK-33785 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: jdbc-3.1.1 > Environment: Flink: 1.17.1 > Jdbc connector: 3.1.1 > Postgresql: 16.1 > Reporter: Bodong Liu > Priority: Major > Attachments: image-2023-12-08-22-24-20-295.png, > image-2023-12-08-22-24-26-493.png, image-2023-12-08-22-24-58-986.png, > image-2023-12-08-22-28-44-948.png, image-2023-12-08-22-38-08-559.png, > image-2023-12-08-22-40-35-530.png, image-2023-12-08-22-42-06-566.png > > > h1. Issue Description > When using jdbc connector to DELETE records in database, I found it CAN NOT > delete records correctly. > h1. Reproduction steps > The steps are as follows: > * Create a table with 5 fields and a pk. DDL in postgres: > > {code:java} > create table public.fake > ( > id bigint not null default > nextval('fake_id_seq'::regclass), > name character varying(128) not null, > age integer, > location character varying(256), > birthday timestamp without time zone default CURRENT_TIMESTAMP, > primary key (id, name) > );{code} > !image-2023-12-08-22-24-26-493.png! > > * Insert some data into the table: > {code:java} > INSERT INTO public.fake (id, name, age, location, birthday) VALUES (1, > 'Jack', 10, null, '2023-12-08 21:35:46.000000'); > INSERT INTO public.fake (id, name, age, location, birthday) VALUES (2, > 'Jerry', 18, 'Fake Location', '2023-12-08 13:36:17.088295'); > INSERT INTO public.fake (id, name, age, location, birthday) VALUES (3, > 'John', 20, null, null); > INSERT INTO public.fake (id, name, age, location, birthday) VALUES (4, > 'Marry', null, null, '2023-12-08 13:37:09.721785'); > {code} > !image-2023-12-08-22-24-58-986.png! > * Run the flink code: > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > final String[] fieldNames = {"id", "name", "age", "location", "birthday"}; > final int[] fieldTypes = { > Types.BIGINT, Types.VARCHAR, Types.INTEGER, Types.VARCHAR, > Types.TIMESTAMP > }; > final String[] primaryKeys = {"id", "name"}; > InternalJdbcConnectionOptions internalJdbcConnectionOptions = > InternalJdbcConnectionOptions.builder() > > .setClassLoader(Thread.currentThread().getContextClassLoader()) > .setDriverName(Driver.class.getName()) > .setDBUrl("jdbc:postgresql://localhost:5432/postgres") > .setUsername("postgres") > .setPassword("postgres") > .setTableName("fake") > .setParallelism(1) > .setConnectionCheckTimeoutSeconds(10) > .setDialect(new PostgresDialect()) > .build(); > JdbcOutputFormat<Tuple2<Boolean, Row>, Row, > JdbcBatchStatementExecutor<Row>> jdbcOutputFormat = > JdbcOutputFormat.builder() > .setFieldNames(fieldNames) > .setKeyFields(primaryKeys) > .setFieldTypes(fieldTypes) > .setOptions(internalJdbcConnectionOptions) > .setFlushIntervalMills(1000) > .setFlushMaxSize(10) > .setMaxRetryTimes(3) > .build(); > GenericJdbcSinkFunction<Tuple2<Boolean, Row>> jdbcSinkFunction = > new GenericJdbcSinkFunction<>(jdbcOutputFormat); > Timestamp timestamp = Timestamp.valueOf("2023-12-08 21:35:46.000000"); > // Row to delete > Row row = Row.ofKind(RowKind.DELETE, 1L, "Jack", 10, null, timestamp); > Tuple2<Boolean, Row> element = Tuple2.of(false, row); > > env.fromCollection(Collections.singleton(element)).addSink(jdbcSinkFunction); > env.execute(); > } {code} > When the code executed successfully, we can see that the record id=1 and > name=Jack was not deleted. > h1. Cause Analysis > In the build method of JdbcOutputFormat.Builder, if 'keyFields' option was > set in the JdbcDmlOptions, the method will return a > 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat'. > !image-2023-12-08-22-28-44-948.png! > And in > 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat#createDeleteExecutor', > the method get all the fieldNames instead of keyFields to build the delete > sql statement. So the detele sql may not execute correctly. > !image-2023-12-08-22-38-08-559.png! > h1. How to fix > * Use the real keyFields then fallback to fieldNames to build the executor. > !image-2023-12-08-22-42-06-566.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)