Hello everyone,

I've always used the DataStream API and now I'm trying out the Table API to
create a datastream from a CSV and I'm finding a couple of issues:

1) I'm reading a csv with 7 total fields, the 7th of which is a date
serialized as a Spark TimestampType, written on the csv like this:
2019-07-19T15:31:38.000+01:00. I've defined the TableSource like this:
    val csvTableSource = CsvTableSource.builder()
    .path("sourcefile.csv")
    .fieldDelimiter(",")
/* fields of Types.STRING */
    .field("time", Types.SQL_TIMESTAMP)
    .build()
I'm transforming the Table to a DataStream of type Event:

class Event {
  // fields of type String
  var time: Timestamp = _
}

val ds: DataStream[Event] = tEnv.toAppendStream[Event](table)
But when I'm reading from the CSV the following parsing error occurs:

Caused by: org.apache.flink.api.common.io.ParseException: Parsing error for
column 7 of row '......,2019-07-20T09:52:07.000+01:00' originated by
SqlTimestampParser: NUMERIC_VALUE_FORMAT_ERROR.

So, I'm wondering: is it possible to set a DateFormat or something to make
sure the parsing succeeds? I've tried also Types.SQL_DATE and Types.SQL_TIME,
but they fail with same exception.

2) My first option was to make Event as a case class, but with the same
table definition, I was having trouble with the conversion, with an error
telling that the "Arity of 7 fields was not compatible with the destination
arity of 1, of type GenericType<Event>". What's the correct way to handle
case classes? I changed to using a class (which I believe uses the POJO
serializer) and it works ok, but I'm still wondering how to make it work
with Case Classes, which come quite useful sometimes.

Thank you very much,
Federico
-- 
Federico D'Ambrosio

Reply via email to