Hi Hequn,

As CsvTableSource sounds to be optimized for csv parsing I won't question
it too much.

Your second point sounds really better.
I can extend the CsvTableSource with extra Avro schema conflating
capabilities. Then if the csv file header doesn't match the avro schema
specification, then it throws an exception prior to parse the whole csv
Right ?

I plan to check the quality of data in two independent steps :
1 check the "file structure" with number of columns and their names, mainly
dealing with header row. Any error leads to whole file rejection with a
java exception
2 check each row with udfs to get ones that aren't consistent with avro
schema. Any error is logged, but the rest of the file is processed and
loaded.

I guess that first step doesn't require AvroInputFormat but a simple avro's
Schema object and the second would be more efficient with an
AvroInputFormat. Am I right ?

Thanks for useful inputs, all the best

François


2018-07-07 4:20 GMT+02:00 Hequn Cheng <chenghe...@gmail.com>:

> Hi francois,
>
> > I see that CsvTableSource allows to define csv fields. Then, will it
> check if columns actually exists in the file and throw Exception if not ?
> Currently, CsvTableSource doesn't support Avro. CsvTableSource
> uses fieldDelim and rowDelim to parse data. But there is a workaround: read
> each line from data as a single big column, i.e., the source table only has
> one column. Afterward, you can use udtf[1] to split each line. You can
> throw away data or throw exceptions in udtf as you wish.
>
> >  I want to check if files structure is right before processing them.
> If you want to skip the whole file when the schema is erroneous. You can
> write a user defined table source and probably have to write a user defined
> InputFormat. You can refer to the AvroInputFormat[2] as an example.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/udfs.html#table-functions
> [2] https://github.com/apache/flink/blob/master/flink-
> formats/flink-avro/src/main/java/org/apache/flink/formats/
> avro/AvroInputFormat.java
>
> On Fri, Jul 6, 2018 at 11:32 PM, françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi Hequn,
>>
>> The Table-API is really great.
>> I will use and certainly love it to solve the issues I mentioned before
>>
>> One subsequent question regarding Table-API :
>> I've got my csv files and avro schemas that describe them.
>> As my users can send erroneous files, inconsistent with schemas, I want
>> to check if files structure is right before processing them.
>> I see that CsvTableSource allows to define csv fields. Then, will it
>> check if columns actually exists in the file and throw Exception if not ?
>>
>> Or is there any other way in Apache Avro to check if a csv file is
>> consistent with a given schema?
>>
>> Big thank to put on the table-api's way :)
>>
>> Best R
>>
>> François Lacombe
>>
>>
>>
>> 2018-07-06 16:53 GMT+02:00 Hequn Cheng <chenghe...@gmail.com>:
>>
>>> Hi francois,
>>>
>>> If I understand correctly, you can use sql or table-api to solve you
>>> problem.
>>> As you want to project part of columns from source, a columnar storage
>>> like parquet/orc would be efficient. Currently, ORC table source is
>>> supported in flink, you can find more details here[1]. Also, there are many
>>> other table sources[2] you can choose. With a TableSource, you can read the
>>> data and register it as a Table and do table operations through sql[3] or
>>> table-api[4].
>>>
>>> To make a json string from several columns, you can write a user defined
>>> function[5].
>>>
>>> I also find a OrcTableSourceITCase[6] which I think may be helpful for
>>> you.
>>>
>>> Best, Hequn
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/d
>>> ev/table/sourceSinks.html#orctablesource
>>> [2] https://ci.apache.org/projects/flink/flink-docs-master/d
>>> ev/table/sourceSinks.html#table-sources-sinks
>>> [3] https://ci.apache.org/projects/flink/flink-docs-master/d
>>> ev/table/sql.html
>>> [4] https://ci.apache.org/projects/flink/flink-docs-master/d
>>> ev/table/tableApi.html
>>> [5] https://ci.apache.org/projects/flink/flink-docs-master/d
>>> ev/table/udfs.html
>>> [6] https://github.com/apache/flink/blob/master/flink-connec
>>> tors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSo
>>> urceITCase.java
>>>
>>>
>>> On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <
>>> francois.laco...@dcbrain.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm a new user to Flink community. This tool sounds great to achieve
>>>> some data loading of millions-rows files into a pgsql db for a new project.
>>>>
>>>> As I read docs and examples, a proper use case of csv loading into
>>>> pgsql can't be found.
>>>> The file I want to load isn't following the same structure than the
>>>> table, I have to delete some columns and make a json string from several
>>>> others too prior to load to pgsql
>>>>
>>>> I plan to use Flink 1.5 Java API and a batch process.
>>>> Does the DataSet class is able to strip some columns out of the records
>>>> I load or should I iterate over each record to delete the columns?
>>>>
>>>> Same question to make a json string from several columns of the same
>>>> record?
>>>> E.g json_column =3D {"field1":col1, "field2":col2...}
>>>>
>>>> I work with 20 millions length files and it sounds pretty ineffective
>>>> to iterate over each records.
>>>> Can someone tell me if it's possible or if I have to change my mind
>>>> about this?
>>>>
>>>>
>>>> Thanks in advance, all the best
>>>>
>>>> François Lacombe
>>>>
>>>>
>>>
>>
>

Reply via email to