Bodong Liu created FLINK-33785:
----------------------------------
Summary: TableJdbcUpsertOutputFormat could not deal with DELETE
record correctly when primary keys were set
Key: FLINK-33785
URL: https://issues.apache.org/jira/browse/FLINK-33785
Project: Flink
Issue Type: Bug
Components: Connectors / JDBC
Affects Versions: jdbc-3.1.1
Environment: Flink: 1.17.1
Jdbc connector: 3.1.1
Postgresql: 16.1
Reporter: Bodong Liu
Attachments: image-2023-12-08-22-24-20-295.png,
image-2023-12-08-22-24-26-493.png, image-2023-12-08-22-24-58-986.png,
image-2023-12-08-22-28-44-948.png, image-2023-12-08-22-38-08-559.png,
image-2023-12-08-22-40-35-530.png, image-2023-12-08-22-42-06-566.png
h1. Issue Description
When using jdbc connector to DELETE records in database, I found it CAN NOT
delete records correctly.
h1. Reproduction steps
The steps are as follows:
* Create a table with 5 fields and a pk. DDL in postgres:
{code:java}
create table public.fake
(
id bigint not null default
nextval('fake_id_seq'::regclass),
name character varying(128) not null,
age integer,
location character varying(256),
birthday timestamp without time zone default CURRENT_TIMESTAMP,
primary key (id, name)
);{code}
!image-2023-12-08-22-24-26-493.png!
* Insert some data into the table:
{code:java}
INSERT INTO public.fake (id, name, age, location, birthday) VALUES (1, 'Jack',
10, null, '2023-12-08 21:35:46.000000');
INSERT INTO public.fake (id, name, age, location, birthday) VALUES (2, 'Jerry',
18, 'Fake Location', '2023-12-08 13:36:17.088295');
INSERT INTO public.fake (id, name, age, location, birthday) VALUES (3, 'John',
20, null, null);
INSERT INTO public.fake (id, name, age, location, birthday) VALUES (4, 'Marry',
null, null, '2023-12-08 13:37:09.721785');
{code}
!image-2023-12-08-22-24-58-986.png!
* Run the flink code:
{code:java}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final String[] fieldNames = {"id", "name", "age", "location", "birthday"};
final int[] fieldTypes = {
Types.BIGINT, Types.VARCHAR, Types.INTEGER, Types.VARCHAR,
Types.TIMESTAMP
};
final String[] primaryKeys = {"id", "name"};
InternalJdbcConnectionOptions internalJdbcConnectionOptions =
InternalJdbcConnectionOptions.builder()
.setClassLoader(Thread.currentThread().getContextClassLoader())
.setDriverName(Driver.class.getName())
.setDBUrl("jdbc:postgresql://localhost:5432/postgres")
.setUsername("postgres")
.setPassword("postgres")
.setTableName("fake")
.setParallelism(1)
.setConnectionCheckTimeoutSeconds(10)
.setDialect(new PostgresDialect())
.build();
JdbcOutputFormat<Tuple2<Boolean, Row>, Row,
JdbcBatchStatementExecutor<Row>> jdbcOutputFormat =
JdbcOutputFormat.builder()
.setFieldNames(fieldNames)
.setKeyFields(primaryKeys)
.setFieldTypes(fieldTypes)
.setOptions(internalJdbcConnectionOptions)
.setFlushIntervalMills(1000)
.setFlushMaxSize(10)
.setMaxRetryTimes(3)
.build();
GenericJdbcSinkFunction<Tuple2<Boolean, Row>> jdbcSinkFunction =
new GenericJdbcSinkFunction<>(jdbcOutputFormat);
Timestamp timestamp = Timestamp.valueOf("2023-12-08 21:35:46.000000");
// Row to delete
Row row = Row.ofKind(RowKind.DELETE, 1L, "Jack", 10, null, timestamp);
Tuple2<Boolean, Row> element = Tuple2.of(false, row);
env.fromCollection(Collections.singleton(element)).addSink(jdbcSinkFunction);
env.execute();
} {code}
When the code executed successfully, we can see that the record id=1 and
name=Jack was not deleted.
h1. Cause Analysis
In the build method of JdbcOutputFormat.Builder, if 'keyFields' option was set
in the JdbcDmlOptions, the method will return a
'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat'.
!image-2023-12-08-22-28-44-948.png!
And in
'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat#createDeleteExecutor',
the method get all the fieldNames instead of keyFields to build the delete sql
statement. So the detele sql may not execute correctly.
!image-2023-12-08-22-38-08-559.png!
h1. How to fix
* Use the real keyFields then fallback to fieldNames to build the executor.
!image-2023-12-08-22-42-06-566.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)