Cai Liuyang created FLINK-33934:
-----------------------------------
Summary: Flink SQL Source use raw format maybe lead to data lost
Key: FLINK-33934
URL: https://issues.apache.org/jira/browse/FLINK-33934
Project: Flink
Issue Type: Bug
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table
SQL / Runtime
Reporter: Cai Liuyang
In our product we encounter a case that lead to data lost, the job info:
1. using flinkSQL that read data from messageQueue and write to hive (only
select value field, doesn't contain metadata field)
2. the format of source table is raw format
But if we select value field and metadata field at the same time, than the data
lost will not appear
After we review the code, we found that the reason is the object reuse of
Raw-format(see code
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
why object reuse will lead to this problem is below (take kafka as example):
1. RawFormatDeserializationSchema will be use in the Fetcher-Thread of
SourceOperator, Fetcher-Thread will read and deserialize data from kafka
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
2. SourceOperator's main thread will pull data from the ElementQueue(which
is shared with the FetcherThread) and process it (see code [SourceOperator main
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
3. For RawFormatDeserializationSchema, its deserialize function will return the
same object([reuse
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
4. So, if elementQueue have element that not be consumed, than the
fetcherThread can change the filed of the reused rawData that
RawFormatDeserializationSchema::deserialize returned, this will lead to data
lost;
The reason that we select value and metadata field at the same time will not
encounter data lost is:
if we select metadata field there will return a new RowData object see code:
[DynamicKafkaDeserializationSchema deserialize with metadata field
|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
and if we only select value filed, it will reuse the RowData object that
formatDeserializationSchema returned see code
[DynamicKafkaDeserializationSchema deserialize only with value
field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
To solve this problem, i think we should diable reuse object of
RawFormatDeserializationSchema.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)