[ 
https://issues.apache.org/jira/browse/FLINK-24961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447177#comment-17447177
 ] 

Fangliang Liu commented on FLINK-24961:
---------------------------------------

Hi  [~jark] [~lzljs3620320], Take a look, thanks.

> When the DDL statement is different from the actual schema in the database, 
> ArrayIndexOutOfBoundsException will be reported 
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24961
>                 URL: https://issues.apache.org/jira/browse/FLINK-24961
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.13.2
>            Reporter: Fangliang Liu
>            Priority: Major
>
> DDL
> {code:java}
> CREATE TABLE if not exists table_a (
>        `user_id` BIGINT NULL COMMENT '',
>        `id` BIGINT NULL COMMENT '',
>        `position_id` BIGINT NULL COMMENT '',
>        `status` STRING NULL COMMENT '',
>        `transaction_id` BIGINT NULL COMMENT '',
>     PRIMARY KEY (`user_id`, `id`) NOT ENFORCED
>     ) WITH(
>           'connector'='kafka',
>           'topic'='xxxx',
>           'properties.bootstrap.servers'='xxx',
>           'properties.group.id'='xxx',
>           'properties.auto.offset.reset'='earliest',
>           'scan.startup.mode'='earliest-offset',
>           'format'='debezium-avro-confluent',
>           'debezium-avro-confluent.schema-registry.url'='xxxx'
>           );
> CREATE TABLE if not exists table_b (
>      `user_id` BIGINT NULL COMMENT '',
>      `id` BIGINT NULL COMMENT '',
>      `position_id` BIGINT NULL COMMENT '',
>      `status` STRING NULL COMMENT '',
>      `transaction_id` BIGINT NULL COMMENT '',
>     ) WITH (
>           'connector' = 'tidb',
>           'tidb.database.url' = 'jdbc:mysql://xxxx',
>           'tidb.username' = 'xxxx',
>           'tidb.password' = 'xxxxx',
>           'tidb.database.name' = 'xxxxx',
>           'tidb.maximum.pool.size' = '1',
>           'tidb.minimum.idle.size' = '1',
>           'tidb.table.name' = 'withdraws',
>           'tidb.write_mode' = 'upsert',
>           'sink.buffer-flush.max-rows' = '0'
>           );
> insert into table_b select * from table_a;
> {code}
> The actual schema in tidb has one more auto-increment column than table_b, 
> and the following error is reported when the task is started
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: -1
>       at 
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$createBufferReduceExecutor$1(JdbcDynamicOutputFormatBuilder.java:145)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>       at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) 
> ~[?:1.8.0_291]
>       at 
> java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032)
>  ~[?:1.8.0_291]
>       at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) 
> ~[?:1.8.0_291]
>       at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
> ~[?:1.8.0_291]
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> ~[?:1.8.0_291]
>       at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) 
> ~[?:1.8.0_291]
>       at 
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>  ~[?:1.8.0_291]
>       at 
> java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) 
> ~[?:1.8.0_291]
>       at 
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.createBufferReduceExecutor(JdbcDynamicOutputFormatBuilder.java:145)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.connector.jdbc.table.JdbcDynamicOutputFormatBuilder.lambda$build$edc08011$1(JdbcDynamicOutputFormatBuilder.java:106)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:142)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:116)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)
>  ~[flink-connector-jdbc_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>  ~[flink-tidb-connector-1.13-0.0.4.jar:?]
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
>  ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) 
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>       at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_291]
> 2021-11-19 07:55:36,996 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Clearing resource requirements of job 25e6800fc67392651c32db54b2fcc483 
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to