Cai Liuyang created FLINK-33934:
-----------------------------------

             Summary: Flink SQL Source use raw format maybe lead to data lost
                 Key: FLINK-33934
                 URL: https://issues.apache.org/jira/browse/FLINK-33934
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / Runtime
            Reporter: Cai Liuyang


In our product we encounter a case that lead to data lost, the job info: 
   1. using flinkSQL that read data from messageQueue and write to hive (only 
select value field, doesn't contain metadata field)
   2. the format of source table is raw format
 
But if we select value field and metadata field at the same time, than the data 
lost will not appear
 
After we review the code, we found that the reason is the object reuse of 
Raw-format(see code 
[RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
 why object reuse will lead to this problem is below (take kafka as example):
    1. RawFormatDeserializationSchema will be use in the Fetcher-Thread of 
SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
partition, than put data to ElementQueue (see code [SourceOperator FetcherTask 
|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
    2. SourceOperator's main thread will pull data from the ElementQueue(which 
is shared with the FetcherThread) and process it (see code [SourceOperator main 
thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
3. For RawFormatDeserializationSchema, its deserialize function will return the 
same object([reuse 
object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
4. So, if elementQueue have element that not be consumed, than the 
fetcherThread can change the filed of the reused rawData that 
RawFormatDeserializationSchema::deserialize returned, this will lead to data 
lost;
 
The reason that we select value and metadata field at the same time will not 
encounter data lost is:
   if we select metadata field there will return a new RowData object see code: 
[DynamicKafkaDeserializationSchema deserialize with metadata field 
|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
 and if we only select value filed, it will reuse the RowData object that 
formatDeserializationSchema returned see code 
[DynamicKafkaDeserializationSchema deserialize only with value 
field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
 
To solve this problem, i think we should diable reuse object of 
RawFormatDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to