maoxingda created FLINK-36428:
---------------------------------
Summary: DynamoDb Table API Sink fails when null value in the
RowData
Key: FLINK-36428
URL: https://issues.apache.org/jira/browse/FLINK-36428
Project: Flink
Issue Type: Bug
Components: Connectors / AWS
Affects Versions: 1.18.1
Reporter: maoxingda
Fix For: 1.19.1
DynamoDb Table API Sink fails when there are null values in the RowData.
package com.meican;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SqlDynamodbSinkApp {
public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("create temporary view source as " +
"select '1' as id, 'name1' as name, 18 as age union all " +
"select '2' as id, 'name2' as name, 19 as age union all " +
"select '3' as id, cast(null as string) as name, 20 as age"
);
tableEnv.executeSql("create table sink" +
"(" +
" id string," +
" name string," +
" age int" +
") partitioned by ( id )" +
"with" +
"(" +
" 'connector' = 'dynamodb'," +
" 'aws.region' = 'cn-northwest-1'," +
" 'table-name' = 'bi-oltp-mydata'," +
" 'ignore-nulls' = 'true'" +
")"
);
tableEnv.executeSql("insert into sink select * from source");
}
}
java.lang.NullPointerException: null
at
org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:39)
~[flink-table-runtime-1.18.0.jar:1.18.0]
at
org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:27)
~[flink-table-runtime-1.18.0.jar:1.18.0]
at
org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.lambda$addAttribute$0(RowDataToAttributeValueConverter.java:88)
~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
at
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.ResolvedImmutableAttribute.lambda$create$0(ResolvedImmutableAttribute.java:54)
~[dynamodb-enhanced-2.20.144.jar:?]
at
software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.lambda$itemToMap$5(StaticImmutableTableSchema.java:518)
~[dynamodb-enhanced-2.20.144.jar:?]
at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?]
at
java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
~[?:?]
at
software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.itemToMap(StaticImmutableTableSchema.java:516)
~[dynamodb-enhanced-2.20.144.jar:?]
at
software.amazon.awssdk.enhanced.dynamodb.mapper.WrappedTableSchema.itemToMap(WrappedTableSchema.java:67)
~[dynamodb-enhanced-2.20.144.jar:?]
at
org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.convertRowData(RowDataToAttributeValueConverter.java:53)
~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
at
org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:56)
~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
at
org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:35)
~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
at
org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:328)
~[flink-connector-files-1.17.2.jar:1.17.2]
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161)
~[flink-streaming-java-1.18.0.jar:1.18.0]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
~[flink-streaming-java-1.18.0.jar:1.18.0]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
~[flink-streaming-java-1.18.0.jar:1.18.0]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
~[flink-streaming-java-1.18.0.jar:1.18.0]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-streaming-java-1.18.0.jar:1.18.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
~[flink-streaming-java-1.18.0.jar:1.18.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
~[flink-streaming-java-1.18.0.jar:1.18.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
~[flink-streaming-java-1.18.0.jar:1.18.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
~[flink-streaming-java-1.18.0.jar:1.18.0]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
~[flink-runtime-1.18.0.jar:1.18.0]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
[flink-runtime-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
[flink-runtime-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
[flink-runtime-1.18.0.jar:1.18.0]
at java.lang.Thread.run(Thread.java:829) [?:?]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)