[ 
https://issues.apache.org/jira/browse/FLINK-19244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-19244:
----------------------------
    Summary: CSV format can't deserialize null ROW field  (was: 
CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.)

> CSV format can't deserialize null ROW field
> -------------------------------------------
>
>                 Key: FLINK-19244
>                 URL: https://issues.apache.org/jira/browse/FLINK-19244
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.11.2
>            Reporter: Ying Z
>            Assignee: Ying Z
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.11.3
>
>
> CREATE TABLE csv_table (
>  f0 ROW<f0c0 VARCHAR, f0c1 VARCHAR>,
>  f1 ROW<f1c0 INT, f1c1 VARCHAR>
>  )
> If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456
> When deserialize the data, the jsonNode of f0 would be [], then throws cast 
> exception: Row length mismatch. 2 fields expected but was 0.
> In the real scene, I set two streams:
>  First, read json_table, sink to csv_table, which has the schema above.
>  Then, read csv_table, do sth.
> if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second 
> streams failed with the exception.
> If this is a bug, I want to help to fix this and unittests.
>  
> here is the  test code:
> {code:java}
> // code placeholder
> val subDataType0 = ROW(
>   FIELD("f0c0", STRING()),
>   FIELD("f0c1", STRING())
> )
> val subDataType1 = ROW(
>   FIELD("f1c0", INT()),
>   FIELD("f1c1", INT())
> )
> val datatype = ROW(
>   FIELD("f0", subDataType0),
>   FIELD("f1", subDataType1))
> val rowType = datatype.getLogicalType.asInstanceOf[RowType]
> val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build()
> val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new 
> RowDataTypeInfo(rowType)).build()
> def foo(r: RowData): Unit = {
>   val serData = new String(serSchema.serialize(r))
>   print(s"${serData}")
>   val deserRow = deserSchema.deserialize(serData.getBytes)
>   println(s"${deserRow}")
> }
> val normalRowData = GenericRowData.of(
>   GenericRowData.of(BinaryStringData.fromString("hello"), 
> BinaryStringData.fromString("world")),
>   GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
> )
> // correct.
> foo(normalRowData)
> val nullRowData = GenericRowData.of(
>   null,
>   GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
> )
> /*
> Exception in thread "main" java.io.IOException: Failed to deserialize CSV row 
> ',123;456
> ...
> Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected 
> but was 0.
>  */
> foo(nullRowData)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to