Jiangjie Qin created FLINK-32398:
------------------------------------

             Summary: Support Avro SpecificRecord in DataStream and Table 
conversion.
                 Key: FLINK-32398
                 URL: https://issues.apache.org/jira/browse/FLINK-32398
             Project: Flink
          Issue Type: New Feature
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.17.1
            Reporter: Jiangjie Qin


At this point, it seems that Avro SpecificRecord is not supported in DataStream 
and Table conversion. For example, the following code breaks when MyAvroRecord 
contains fields of type Record, Enum, Array, etc.

 
{code:java}
ing schemaString = MyAvroRecord.getClassSchema().toString();
DataType dataType = AvroSchemaConverter.convertToDataType(schemaString);
TypeInformation<MyAvroRecord> typeInfo = 
AvroSchemaConverter.convertToTypeInfo(schemaString);;

input.getTransformation().setOutputType(typeInfo);
tEnv.createTemporaryView("myTable", input);
Table result = tEnv.sqlQuery("SELECT * FROM myTable");
DataStream<MyAvroRecord> output = tEnv.toDataStream(result, dataType);
output.getTransformation().setOutputType(typeInfo); {code}
 

While the conversion from {{MyAvroRecord}} to {{RowData}} seems fine, several 
issues were there when converting the {{RowData}} back to {{{}MyAvroRecord{}}}, 
including but not limited to:
 # {{AvroSchemaConverter.convertToDataType(schema)}} maps Avro Record type to 
RowType, which loses the class information.
 # {{AvroSchemaConverter}} maps Enum to StringType, and simply try to cast the 
string to the Enum.

I did not find a way to easily convert the between DataStream and Table for 
Avro SpecificRecord. Given the popularity of Avro SpecificRecord, we should 
support this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to