XML equivalent to JsonRowDeserializationSchema

2019-08-07 Thread françois lacombe
Hi everyone,

I've had a good experience with JsonRowDeserializationSchema to deserialise
nested json records according to Flink's TypeInformation.
As I look to process XML records now, does anyone use an equivalent for
this format please?

Nested records may be a pain to process sometimes and since Flink provides
a comprehensive model for type information it may be good to look for such
XML functions.

I did not find any InputFormat for XML too, did I miss something?

Thanks in advance, all the best

François Lacombe
DCbrain

-- 

 <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>   
<https://www.linkedin.com/company/dcbrain>   
<https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>


 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Improve data type validation in Flink inputformats according to avro schemas

2019-07-31 Thread françois lacombe
Hi all,

Following this JIRA ticket opened last year :
https://issues.apache.org/jira/browse/FLINK-9813

I use to run Flink, currently 1.8, in my professional activity (at DCbrain,
a French B2B software editor for physical fluid networks operators) and we
wish to share some of our experience with community as a lot of valuable
work is done here.
We began to write some custom InputFormats with particular functionalities
which may be useful to anyone interested in data validation.

Prior to address a large variety of formats, we were looking forward to
build them according to Avro schemas as explained in the JIRA ticket for
CSV. Then we now try to implement part of our data validation strategy
according to the schema provided to build the inputFormat. Avro schemas are
suitable for this and pretty nice to handle.
To me, type validation = check records against defined schema to redirect
bogus or unexpected ones to a dedicated output for administrative or data
engineering inspection and preserve rest of the job with conform records

We do have now an abstract class extending RichInputFormat allowing to
define type-validation-able inputformats with main advantages :
- Identify and log Row records involving data with different type than
specified in the schema
- Preserve type safety in most part of our jobs, starting in nextRecord()
method : we only send further what conforms to Avro schema
- Inspect streaming records on the fly without much processing workload

Currently we can provide type-validation capable intput formats for :
- Csv
- Json
- GeoJson
- ESRI Shape

JDBCInputFormat has been wrapped in dedicated logic involving Avro schemas
as well.

This approach is only a first step and will surely need to be improved,
reworked in case of mistake and so on...
According to me, currently Flink doesn't offer ability to redirect records
to alternative output, don't you?

How does Flink roadmap deal with such additional validation functionalities?
Would committers and users find desirable to introduce such functionality
in a further release?

Looking forward to read anyone interested, all the best

François Lacombe
DCbrain

-- 

 <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>   
<https://www.linkedin.com/company/dcbrain>   
<https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>


 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Re: Get nested Rows from Json string

2019-02-19 Thread françois lacombe
Hi Rong,

Thank you for JIRA.
Understood it may be solved in a next release, I'll comment the ticket in
case of further input

All the best

François

Le sam. 9 févr. 2019 à 00:57, Rong Rong  a écrit :

> Hi François,
>
> I just did some research and seems like this is in fact a Stringify issue.
> If you try running one of the AvroRowDeSerializationSchemaTest [1],
> you will find out that only MAP, ARRAY are correctly stringify (Map using
> "{}" quote and Array using "[]" quote).
> However nested records are not quoted using "()".
>
> Wasn't sure if this is consider as a bug for the toString method of the
> type Row. I just filed a JIRA [2] for this issue, feel free to comment on
> the discussion.
>
> --
> Rong
>
> [1]
> https://github.com/apache/flink/blob/release-1.7/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
> [2] https://issues.apache.org/jira/browse/FLINK-11569
>
> On Fri, Feb 8, 2019 at 8:51 AM françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi Rong,
>>
>> Thank you for this answer.
>> I've changed Rows to Map, which ease the conversion process.
>>
>> Nevertheless I'm interested in any explanation about why row1.setField(i,
>> row2) appeends row2 at the end of row1.
>>
>> All the best
>>
>> François
>>
>> Le mer. 6 févr. 2019 à 19:33, Rong Rong  a écrit :
>>
>>> Hi François,
>>>
>>> I wasn't exactly sure this is a JSON object or JSON string you are
>>> trying to process.
>>> For a JSON string this [1] article might help.
>>> For a JSON object, I am assuming you are trying to convert it into a
>>> TableSource and processing using Table/SQL API, you could probably use the
>>> example here [2]
>>>
>>> BTW, a very remote hunch, this might be just a stringify issue how you
>>> print the row out.
>>>
>>> --
>>> Rong
>>>
>>> [1]:
>>> https://stackoverflow.com/questions/49380778/how-to-stream-a-json-using-flink
>>> [2]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sourceSinks.html#table-sources-sinks
>>>
>>> On Wed, Feb 6, 2019 at 3:06 AM françois lacombe <
>>> francois.laco...@dcbrain.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I currently get a json string from my pgsql source with nested objects
>>>> to be converted into Flink's Row.
>>>> Nested json objects should go in nested Rows.
>>>> An avro schema rules the structure my source should conform to.
>>>>
>>>> According to this json :
>>>> {
>>>>   "a":"b",
>>>>   "c":"d",
>>>>   "e":{
>>>>"f":"g"
>>>>}
>>>> }
>>>>
>>>> ("b", "d", Row("g")) is expected as a result according to my avro
>>>> schema.
>>>>
>>>> I wrote a recursive method which iterate over json objects and put
>>>> nested Rows at right indices in their parent but here is what outputs :
>>>> ("b", "d", "g")
>>>> Child Row is appended to the parent. I don't understand why.
>>>> Obviously, process is crashing arguing the top level Row arity doesn't
>>>> match serializers.
>>>>
>>>> Is there some native methods in Flink to achieve that?
>>>> I don't feel so comfortable to have written my own json processor for
>>>> this job.
>>>>
>>>> Do you have any hint which can help please ?
>>>>
>>>> All the best
>>>>
>>>> François
>>>>
>>>>
>>>>
>>>> <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>
>>>><https://www.linkedin.com/company/dcbrain>
>>>> <https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>
>>>>
>>>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>>>> nécessaire
>>>>
>>>
>>
>> <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>
>> <https://www.linkedin.com/company/dcbrain>
>> <https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>
>>
>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>> nécessaire
>>
>

-- 

 <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>   
<https://www.linkedin.com/company/dcbrain>   
<https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>


 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Re: How to load multiple same-format files with single batch job?

2019-02-19 Thread françois lacombe
Hi Fabian,

After a bit more documentation reading I have a better understanding of how
InputFormat interface works.
Indeed I've better to wrap a custom InputFormat implementation in my source.
This article helps a lot
https://brewing.codes/2017/02/06/implementing-flink-batch-data-connector/

connect() will be for a next sprint

All the best

François

Le ven. 15 févr. 2019 à 09:37, Fabian Hueske  a écrit :

> H François,
>
> The TableEnvironment.connect() method can only be used if you provide
> (quite a bit) more code.
> It requires a TableSourceFactory and handling of all the properties that
> are defined in the other builder methods. See [1].
>
> I would recommend to either register the BatchTableSource directly
> (tEnv.registerTableSource()) or get a DataSet (via env.createSource()) and
> register the DataSet as a Table (tEnv.registerDataSet()).
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sourceSinks.html#define-a-tablefactory
>
>
> Am Mo., 11. Feb. 2019 um 21:09 Uhr schrieb françois lacombe <
> francois.laco...@dcbrain.com>:
>
>> Hi Fabian,
>>
>> I've got issues for a custom InputFormat implementation with my existing
>> code.
>>
>> Is this can be used in combination with a BatchTableSource custom source?
>> As I understand your solution, I should move my source to implementations
>> like :
>>
>> tableEnvironment
>>   .connect(...)
>>   .withFormat(...)
>>   .withSchema(...)
>>   .inAppendMode()
>>   .registerTableSource("MyTable")
>>
>> right?
>>
>> I currently have a BatchTableSource class which produce a DataSet
>> from a single geojson file.
>> This doesn't sound compatible with a custom InputFormat, don't you?
>>
>> Thanks in advance for any addition hint, all the best
>>
>> François
>>
>> Le lun. 4 févr. 2019 à 12:10, Fabian Hueske  a écrit :
>>
>>> Hi,
>>>
>>> The files will be read in a streaming fashion.
>>> Typically files are broken down into processing splits that are
>>> distributed to tasks for reading.
>>> How a task reads a file split depends on the implementation, but usually
>>> the format reads the split as a stream and does not read the split as a
>>> whole before emitting records.
>>>
>>> Best,
>>> Fabian
>>>
>>> Am Mo., 4. Feb. 2019 um 12:06 Uhr schrieb françois lacombe <
>>> francois.laco...@dcbrain.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thank you for this input.
>>>> This is interesting.
>>>>
>>>> With such an input format, will all the file will be loaded in memory
>>>> before to be processed or will all be streamed?
>>>>
>>>> All the best
>>>> François
>>>>
>>>> Le mar. 29 janv. 2019 à 22:20, Fabian Hueske  a
>>>> écrit :
>>>>
>>>>> Hi,
>>>>>
>>>>> You can point a file-based input format to a directory and the input
>>>>> format should read all files in that directory.
>>>>> That works as well for TableSources that are internally use file-based
>>>>> input formats.
>>>>> Is that what you are looking for?
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> Am Mo., 28. Jan. 2019 um 17:22 Uhr schrieb françois lacombe <
>>>>> francois.laco...@dcbrain.com>:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm wondering if it's possible and what's the best way to achieve the
>>>>>> loading of multiple files with a Json source to a JDBC sink ?
>>>>>> I'm running Flink 1.7.0
>>>>>>
>>>>>> Let's say I have about 1500 files with the same structure (same
>>>>>> format, schema, everything) and I want to load them with a *batch* job
>>>>>> Can Flink handle the loading of one and each file in a single source
>>>>>> and send data to my JDBC sink?
>>>>>> I wish I can provide the URL of the directory containing my thousand
>>>>>> files to the batch source to make it load all of them sequentially.
>>>>>> My sources and sinks are currently available for BatchTableSource, I
>>>>>> guess the cost to make them available for streaming would be quite
>>>>>> expensive for me for the moment.
>>>>>>
>

Re: [ANNOUNCE] New Flink PMC member Thomas Weise

2019-02-13 Thread françois lacombe
Congratulation Thomas

Thanks for help you provide and useful inputs

François

Le mer. 13 févr. 2019 à 03:13, Kurt Young  a écrit :

> Congrats Thomas!
>
> Best,
> Kurt
>
>
> On Wed, Feb 13, 2019 at 10:02 AM Shaoxuan Wang 
> wrote:
>
>> Congratulations, Thomas!
>>
>> On Tue, Feb 12, 2019 at 5:59 PM Fabian Hueske  wrote:
>>
>>> Hi everyone,
>>>
>>> On behalf of the Flink PMC I am happy to announce Thomas Weise as a new
>>> member of the Apache Flink PMC.
>>>
>>> Thomas is a long time contributor and member of our community.
>>> He is starting and participating in lots of discussions on our mailing
>>> lists, working on topics that are of joint interest of Flink and Beam, and
>>> giving talks on Flink at many events.
>>>
>>> Please join me in welcoming and congratulating Thomas!
>>>
>>> Best,
>>> Fabian
>>>
>>

-- 

       
   



 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Re: How to load multiple same-format files with single batch job?

2019-02-11 Thread françois lacombe
Hi Fabian,

I've got issues for a custom InputFormat implementation with my existing
code.

Is this can be used in combination with a BatchTableSource custom source?
As I understand your solution, I should move my source to implementations
like :

tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .registerTableSource("MyTable")

right?

I currently have a BatchTableSource class which produce a DataSet from
a single geojson file.
This doesn't sound compatible with a custom InputFormat, don't you?

Thanks in advance for any addition hint, all the best

François

Le lun. 4 févr. 2019 à 12:10, Fabian Hueske  a écrit :

> Hi,
>
> The files will be read in a streaming fashion.
> Typically files are broken down into processing splits that are
> distributed to tasks for reading.
> How a task reads a file split depends on the implementation, but usually
> the format reads the split as a stream and does not read the split as a
> whole before emitting records.
>
> Best,
> Fabian
>
> Am Mo., 4. Feb. 2019 um 12:06 Uhr schrieb françois lacombe <
> francois.laco...@dcbrain.com>:
>
>> Hi Fabian,
>>
>> Thank you for this input.
>> This is interesting.
>>
>> With such an input format, will all the file will be loaded in memory
>> before to be processed or will all be streamed?
>>
>> All the best
>> François
>>
>> Le mar. 29 janv. 2019 à 22:20, Fabian Hueske  a
>> écrit :
>>
>>> Hi,
>>>
>>> You can point a file-based input format to a directory and the input
>>> format should read all files in that directory.
>>> That works as well for TableSources that are internally use file-based
>>> input formats.
>>> Is that what you are looking for?
>>>
>>> Best, Fabian
>>>
>>> Am Mo., 28. Jan. 2019 um 17:22 Uhr schrieb françois lacombe <
>>> francois.laco...@dcbrain.com>:
>>>
>>>> Hi all,
>>>>
>>>> I'm wondering if it's possible and what's the best way to achieve the
>>>> loading of multiple files with a Json source to a JDBC sink ?
>>>> I'm running Flink 1.7.0
>>>>
>>>> Let's say I have about 1500 files with the same structure (same format,
>>>> schema, everything) and I want to load them with a *batch* job
>>>> Can Flink handle the loading of one and each file in a single source
>>>> and send data to my JDBC sink?
>>>> I wish I can provide the URL of the directory containing my thousand
>>>> files to the batch source to make it load all of them sequentially.
>>>> My sources and sinks are currently available for BatchTableSource, I
>>>> guess the cost to make them available for streaming would be quite
>>>> expensive for me for the moment.
>>>>
>>>> Have someone ever done this?
>>>> Am I wrong to expect doing so with a batch job?
>>>>
>>>> All the best
>>>>
>>>> François Lacombe
>>>>
>>>>
>>>> <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>
>>>><https://www.linkedin.com/company/dcbrain>
>>>> <https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>
>>>>
>>>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>>>> nécessaire
>>>>
>>>
>>
>> <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>
>> <https://www.linkedin.com/company/dcbrain>
>> <https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>
>>
>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>> nécessaire
>>
>

-- 

 <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>   
<https://www.linkedin.com/company/dcbrain>   
<https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>


 Pensez à la 
planète, imprimer ce papier que si nécessaire 


[Table] Types of query result and tablesink do not match error

2019-02-08 Thread françois lacombe
Hi all,

An error is currently raised when using table.insertInto("registeredSink")
in Flink 1.7.0 when types of table and sink don't match.

I've got the following :
org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink null do not match.
Query result schema: [dynamicFields: Map, staticFields: Map]
TableSink schema:[dynamicFields: Map, staticFields: Map]
at
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:876)
at org.apache.flink.table.api.Table.insertInto(table.scala:918)

Schemas are the same
All fields got the GenericType type and I don't understand
why they are so different.

Have you any additional way to get extra debug information ?
Any hint ?

All the best

François

-- 

       
   



 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Re: Get nested Rows from Json string

2019-02-08 Thread françois lacombe
Hi Rong,

Thank you for this answer.
I've changed Rows to Map, which ease the conversion process.

Nevertheless I'm interested in any explanation about why row1.setField(i,
row2) appeends row2 at the end of row1.

All the best

François

Le mer. 6 févr. 2019 à 19:33, Rong Rong  a écrit :

> Hi François,
>
> I wasn't exactly sure this is a JSON object or JSON string you are trying
> to process.
> For a JSON string this [1] article might help.
> For a JSON object, I am assuming you are trying to convert it into a
> TableSource and processing using Table/SQL API, you could probably use the
> example here [2]
>
> BTW, a very remote hunch, this might be just a stringify issue how you
> print the row out.
>
> --
> Rong
>
> [1]:
> https://stackoverflow.com/questions/49380778/how-to-stream-a-json-using-flink
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sourceSinks.html#table-sources-sinks
>
> On Wed, Feb 6, 2019 at 3:06 AM françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi all,
>>
>> I currently get a json string from my pgsql source with nested objects to
>> be converted into Flink's Row.
>> Nested json objects should go in nested Rows.
>> An avro schema rules the structure my source should conform to.
>>
>> According to this json :
>> {
>>   "a":"b",
>>   "c":"d",
>>   "e":{
>>"f":"g"
>>}
>> }
>>
>> ("b", "d", Row("g")) is expected as a result according to my avro schema.
>>
>> I wrote a recursive method which iterate over json objects and put nested
>> Rows at right indices in their parent but here is what outputs : ("b", "d",
>> "g")
>> Child Row is appended to the parent. I don't understand why.
>> Obviously, process is crashing arguing the top level Row arity doesn't
>> match serializers.
>>
>> Is there some native methods in Flink to achieve that?
>> I don't feel so comfortable to have written my own json processor for
>> this job.
>>
>> Do you have any hint which can help please ?
>>
>> All the best
>>
>> François
>>
>>
>>
>> <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>
>> <https://www.linkedin.com/company/dcbrain>
>> <https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>
>>
>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>> nécessaire
>>
>

-- 

 <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>   
<https://www.linkedin.com/company/dcbrain>   
<https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>


 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Get nested Rows from Json string

2019-02-06 Thread françois lacombe
Hi all,

I currently get a json string from my pgsql source with nested objects to
be converted into Flink's Row.
Nested json objects should go in nested Rows.
An avro schema rules the structure my source should conform to.

According to this json :
{
  "a":"b",
  "c":"d",
  "e":{
   "f":"g"
   }
}

("b", "d", Row("g")) is expected as a result according to my avro schema.

I wrote a recursive method which iterate over json objects and put nested
Rows at right indices in their parent but here is what outputs : ("b", "d",
"g")
Child Row is appended to the parent. I don't understand why.
Obviously, process is crashing arguing the top level Row arity doesn't
match serializers.

Is there some native methods in Flink to achieve that?
I don't feel so comfortable to have written my own json processor for this
job.

Do you have any hint which can help please ?

All the best

François

-- 

       
   



 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Re: How to load multiple same-format files with single batch job?

2019-02-05 Thread françois lacombe
Thank you Fabian,

That's good, I'll go for a custom File input stream.

All the best

François

Le lun. 4 févr. 2019 à 12:10, Fabian Hueske  a écrit :

> Hi,
>
> The files will be read in a streaming fashion.
> Typically files are broken down into processing splits that are
> distributed to tasks for reading.
> How a task reads a file split depends on the implementation, but usually
> the format reads the split as a stream and does not read the split as a
> whole before emitting records.
>
> Best,
> Fabian
>
> Am Mo., 4. Feb. 2019 um 12:06 Uhr schrieb françois lacombe <
> francois.laco...@dcbrain.com>:
>
>> Hi Fabian,
>>
>> Thank you for this input.
>> This is interesting.
>>
>> With such an input format, will all the file will be loaded in memory
>> before to be processed or will all be streamed?
>>
>> All the best
>> François
>>
>> Le mar. 29 janv. 2019 à 22:20, Fabian Hueske  a
>> écrit :
>>
>>> Hi,
>>>
>>> You can point a file-based input format to a directory and the input
>>> format should read all files in that directory.
>>> That works as well for TableSources that are internally use file-based
>>> input formats.
>>> Is that what you are looking for?
>>>
>>> Best, Fabian
>>>
>>> Am Mo., 28. Jan. 2019 um 17:22 Uhr schrieb françois lacombe <
>>> francois.laco...@dcbrain.com>:
>>>
>>>> Hi all,
>>>>
>>>> I'm wondering if it's possible and what's the best way to achieve the
>>>> loading of multiple files with a Json source to a JDBC sink ?
>>>> I'm running Flink 1.7.0
>>>>
>>>> Let's say I have about 1500 files with the same structure (same format,
>>>> schema, everything) and I want to load them with a *batch* job
>>>> Can Flink handle the loading of one and each file in a single source
>>>> and send data to my JDBC sink?
>>>> I wish I can provide the URL of the directory containing my thousand
>>>> files to the batch source to make it load all of them sequentially.
>>>> My sources and sinks are currently available for BatchTableSource, I
>>>> guess the cost to make them available for streaming would be quite
>>>> expensive for me for the moment.
>>>>
>>>> Have someone ever done this?
>>>> Am I wrong to expect doing so with a batch job?
>>>>
>>>> All the best
>>>>
>>>> François Lacombe
>>>>
>>>>
>>>> <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>
>>>><https://www.linkedin.com/company/dcbrain>
>>>> <https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>
>>>>
>>>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>>>> nécessaire
>>>>
>>>
>>
>> <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>
>> <https://www.linkedin.com/company/dcbrain>
>> <https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>
>>
>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>> nécessaire
>>
>

-- 

 <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>   
<https://www.linkedin.com/company/dcbrain>   
<https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>


 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Re: How to load multiple same-format files with single batch job?

2019-02-04 Thread françois lacombe
Hi Fabian,

Thank you for this input.
This is interesting.

With such an input format, will all the file will be loaded in memory
before to be processed or will all be streamed?

All the best
François

Le mar. 29 janv. 2019 à 22:20, Fabian Hueske  a écrit :

> Hi,
>
> You can point a file-based input format to a directory and the input
> format should read all files in that directory.
> That works as well for TableSources that are internally use file-based
> input formats.
> Is that what you are looking for?
>
> Best, Fabian
>
> Am Mo., 28. Jan. 2019 um 17:22 Uhr schrieb françois lacombe <
> francois.laco...@dcbrain.com>:
>
>> Hi all,
>>
>> I'm wondering if it's possible and what's the best way to achieve the
>> loading of multiple files with a Json source to a JDBC sink ?
>> I'm running Flink 1.7.0
>>
>> Let's say I have about 1500 files with the same structure (same format,
>> schema, everything) and I want to load them with a *batch* job
>> Can Flink handle the loading of one and each file in a single source and
>> send data to my JDBC sink?
>> I wish I can provide the URL of the directory containing my thousand
>> files to the batch source to make it load all of them sequentially.
>> My sources and sinks are currently available for BatchTableSource, I
>> guess the cost to make them available for streaming would be quite
>> expensive for me for the moment.
>>
>> Have someone ever done this?
>> Am I wrong to expect doing so with a batch job?
>>
>> All the best
>>
>> François Lacombe
>>
>>
>> <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>
>> <https://www.linkedin.com/company/dcbrain>
>> <https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>
>>
>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>> nécessaire
>>
>

-- 

 <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>   
<https://www.linkedin.com/company/dcbrain>   
<https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>


 Pensez à la 
planète, imprimer ce papier que si nécessaire 


How to load multiple same-format files with single batch job?

2019-01-28 Thread françois lacombe
Hi all,

I'm wondering if it's possible and what's the best way to achieve the
loading of multiple files with a Json source to a JDBC sink ?
I'm running Flink 1.7.0

Let's say I have about 1500 files with the same structure (same format,
schema, everything) and I want to load them with a *batch* job
Can Flink handle the loading of one and each file in a single source and
send data to my JDBC sink?
I wish I can provide the URL of the directory containing my thousand files
to the batch source to make it load all of them sequentially.
My sources and sinks are currently available for BatchTableSource, I guess
the cost to make them available for streaming would be quite expensive for
me for the moment.

Have someone ever done this?
Am I wrong to expect doing so with a batch job?

All the best

François Lacombe

-- 

 <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>   
<https://www.linkedin.com/company/dcbrain>   
<https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>


 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Re: Missing Calcite SQL functions in table API

2018-09-06 Thread françois lacombe
Thanks Fabian,

I didn't notice select() wasn't SQL compliant.
sqlQuery works fine, it's all right :)


All the best

François

2018-09-05 12:30 GMT+02:00 Fabian Hueske :

> Hi
>
> You are using SQL syntax in a Table API query. You have to stick to Table
> API syntax or use SQL as
>
> tEnv.sqlQuery("SELECT col1,CONCAT('field1:',col2,',field2:',CAST(col3 AS
> string)) FROM csvTable")
>
> The Flink documentation lists all supported functions for Table API [1]
> and SQL [2].
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/
> tableApi.html
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/table/sql.html
>
> 2018-09-05 12:22 GMT+02:00 françois lacombe 
> :
>
>> Hi all,
>>
>> I'm trying to use CONVERT or CAST functions from Calcite docs to query
>> some table with Table API.
>> https://calcite.apache.org/docs/reference.html
>>
>> csv_table.select("col1,CONCAT('field1:',col2,',field2:',CAST(col3 AS
>> string))");
>> col3 is actually described as int the CSV schema and CONCAT doesn't like
>> it.
>>
>> An exception is thrown "Undefined function: CAST"
>>
>> The docs mention that SQL implementation is based on Calcite and is there
>> a list of available functions please?
>> May I skip some useful dependency?
>>
>>
>> Thanks in advance
>>
>> François
>>
>>
>


Missing Calcite SQL functions in table API

2018-09-05 Thread françois lacombe
Hi all,

I'm trying to use CONVERT or CAST functions from Calcite docs to query some
table with Table API.
https://calcite.apache.org/docs/reference.html

csv_table.select("col1,CONCAT('field1:',col2,',field2:',CAST(col3 AS
string))");
col3 is actually described as int the CSV schema and CONCAT doesn't like it.

An exception is thrown "Undefined function: CAST"

The docs mention that SQL implementation is based on Calcite and is there a
list of available functions please?
May I skip some useful dependency?


Thanks in advance

François


Re: Table API: get Schema from TableSchema, TypeInformation[] or Avro

2018-08-31 Thread françois lacombe
Ok, looking forward reading your document.

All the best
François



2018-08-31 9:09 GMT+02:00 Timo Walther :

> Thanks for your response. I think we won't need this utility in the near
> future. As mentioned, I'm working on a design document that allows for
> better abstraction. I think I will publish it next week.
>
> Regards,
> Timo
>
>
> Am 31.08.18 um 08:36 schrieb françois lacombe:
>
> Hi Timo
>
> Yes it helps, thank you.
> I'll start building such an utility method. Are you interested to get the
> source?
>
> According to mapping here : https://ci.apache.org/
> projects/flink/flink-docs-release-1.6/dev/table/connect.
> html#apache-avro-format
> Is there any way to get corresponding TypeInformation of an Avro type
> or should I hard code a List>?
>
> All the best
>
> François
>
> 2018-08-31 8:12 GMT+02:00 Timo Walther :
>
>> Hi,
>>
>> thanks for your feedback. I agree that the the current interfaces are not
>> flexible enough to fit to every use case. The unified connector API is a a
>> very recent feature that still needs some polishing. I'm working on a
>> design document to improve the situation there.
>>
>> For now, you can simply implement some utitilty method that just iterates
>> over column names and types of TableSchema and calls `schema.field(name,
>> type)`
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>> Am 31.08.18 um 07:40 schrieb françois lacombe:
>>
>> Hi all,
>>>
>>> Today I'm looking into derivating an Avro schema json string into a
>>> Schema object.
>>> In the overview of https://ci.apache.org/projects
>>> /flink/flink-docs-release-1.6/dev/table/connect.html Avro is used as a
>>> format and never as a schema.
>>>
>>> This was a topic in JIRA-9813
>>> I can get a TableSchema with TableSchema schema =
>>> TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo(sch_csv.toString()));
>>> but I can't use it with BatchTableDescriptor.withSchema().
>>>
>>> How can I get a Schema from TableSchema, TypeInformation[] or even
>>> Avro json string?
>>> A little bridge is missing between TableSchema and
>>> org.apache.flink.table.descriptors.Schema it seems.
>>>
>>> Thanks in advance for any useful hint
>>>
>>> François
>>>
>>
>>
>>
>
>


Re: Table API: get Schema from TableSchema, TypeInformation[] or Avro

2018-08-30 Thread françois lacombe
Hi Timo

Yes it helps, thank you.
I'll start building such an utility method. Are you interested to get the
source?

According to mapping here :
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#apache-avro-format
Is there any way to get corresponding TypeInformation of an Avro type or
should I hard code a List>?

All the best

François

2018-08-31 8:12 GMT+02:00 Timo Walther :

> Hi,
>
> thanks for your feedback. I agree that the the current interfaces are not
> flexible enough to fit to every use case. The unified connector API is a a
> very recent feature that still needs some polishing. I'm working on a
> design document to improve the situation there.
>
> For now, you can simply implement some utitilty method that just iterates
> over column names and types of TableSchema and calls `schema.field(name,
> type)`
>
> I hope this helps.
>
> Regards,
> Timo
>
> Am 31.08.18 um 07:40 schrieb françois lacombe:
>
> Hi all,
>>
>> Today I'm looking into derivating an Avro schema json string into a
>> Schema object.
>> In the overview of https://ci.apache.org/projects
>> /flink/flink-docs-release-1.6/dev/table/connect.html Avro is used as a
>> format and never as a schema.
>>
>> This was a topic in JIRA-9813
>> I can get a TableSchema with TableSchema schema =
>> TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo(sch_csv.toString()));
>> but I can't use it with BatchTableDescriptor.withSchema().
>>
>> How can I get a Schema from TableSchema, TypeInformation[] or even
>> Avro json string?
>> A little bridge is missing between TableSchema and
>> org.apache.flink.table.descriptors.Schema it seems.
>>
>> Thanks in advance for any useful hint
>>
>> François
>>
>
>
>


Table API: get Schema from TableSchema, TypeInformation[] or Avro

2018-08-30 Thread françois lacombe
Hi all,

Today I'm looking into derivating an Avro schema json string into a Schema
object.
In the overview of
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html
Avro is used as a format and never as a schema.

This was a topic in JIRA-9813
I can get a TableSchema with TableSchema schema =
TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo(sch_csv.toString()));
but I can't use it with BatchTableDescriptor.withSchema().

How can I get a Schema from TableSchema, TypeInformation[] or even Avro
json string?
A little bridge is missing between TableSchema and
org.apache.flink.table.descriptors.Schema it seems.

Thanks in advance for any useful hint

François


Re: withFormat(Csv) is undefined for the type BatchTableEnvironment

2018-08-30 Thread françois lacombe
Hi

It's all good, I've misunderstood some points in the example codes.
All is working fine with BatchTableDescriptor

All the best

François

2018-08-30 11:40 GMT+02:00 Timo Walther :

> Hi François,
>
> you should read the documentation from top to bottom. The overview part
> [1] explains how everything plays together with examples.
>
> Regards,
> Timo
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/connect.html#overview
>
> Am 30.08.18 um 10:41 schrieb Till Rohrmann:
>
> Hi François,
>
> as Vino said, the BatchTableEnvironment does not provide a `withFormat`
> method. Admittedly, the documentation does not state it too explicitly but
> you can only call the `withFormat` method on a table connector as indicated
> here [1]. If you think that you need to get the data from somewhere first
> before defining a format, then it becomes clear that you first need to
> define a connector.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/table/connect.html#table-formats
>
> Cheers,
> Till
>
> On Thu, Aug 30, 2018 at 4:46 AM vino yang  wrote:
>
>> Hi francois,
>>
>> Maybe you can refer to the comments of this source code?[1]
>>
>> https://github.com/apache/flink/blob/master/flink-
>> libraries/flink-table/src/main/scala/org/apache/flink/table/api/
>> BatchTableEnvironment.scala#L143
>>
>> Thanks, vino.
>>
>> françois lacombe  于2018年8月29日周三 下午10:54写道:
>>
>>> Hi Vino,
>>>
>>> Thanks for this answer.
>>> I can't find in the docs where it's about BatchTableDescriptor
>>> https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.6/dev/table/connect.html#csv-format
>>>
>>> It sounds like the withFormat method is applied on TableEnvironment
>>> object on this page.
>>>
>>> All the best
>>>
>>> François
>>>
>>> 2018-08-28 4:37 GMT+02:00 vino yang :
>>>
>>>> Hi Francois,
>>>>
>>>> Yes, the withFormat API comes from an instance of BatchTableDescriptor,
>>>> and the BatchTableDescriptor instance is returned by the connect API, so
>>>> you should call BatchTableEnvironment#connect first.
>>>>
>>>> Thanks, vino.
>>>>
>>>> françois lacombe  于2018年8月27日周一
>>>> 下午10:26写道:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm currently trying to load a CSV file content with Flink 1.6.0 table
>>>>> API.
>>>>> This error is raised as a try to execute the code written in docs
>>>>> https://ci.apache.org/projects/flink/flink-docs-
>>>>> release-1.6/dev/table/connect.html#csv-format
>>>>>
>>>>> ExecutionEnvironment env = ExecutionEnvironment.
>>>>> getExecutionEnvironment();
>>>>> BatchTableEnvironment tEnv = TableEnvironment.
>>>>> getTableEnvironment(env);
>>>>> tEnv.withFormat(new Csv(...));
>>>>>
>>>>> > Exception in thread "main" java.lang.Error: Unresolved compilation
>>>>> problem:
>>>>>The method withFormat(Csv) is undefined for the type
>>>>> BatchTableEnvironment
>>>>>
>>>>> Am I wrong?
>>>>>
>>>>> Thanks in advance for any hint
>>>>>
>>>>> François
>>>>>
>>>>
>>>
>


Re: withFormat(Csv) is undefined for the type BatchTableEnvironment

2018-08-29 Thread françois lacombe
Hi Vino,

Thanks for this answer.
I can't find in the docs where it's about BatchTableDescriptor
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#csv-format

It sounds like the withFormat method is applied on TableEnvironment object
on this page.

All the best

François

2018-08-28 4:37 GMT+02:00 vino yang :

> Hi Francois,
>
> Yes, the withFormat API comes from an instance of BatchTableDescriptor,
> and the BatchTableDescriptor instance is returned by the connect API, so
> you should call BatchTableEnvironment#connect first.
>
> Thanks, vino.
>
> françois lacombe  于2018年8月27日周一 下午10:26写道:
>
>> Hi all,
>>
>> I'm currently trying to load a CSV file content with Flink 1.6.0 table
>> API.
>> This error is raised as a try to execute the code written in docs
>> https://ci.apache.org/projects/flink/flink-docs-
>> release-1.6/dev/table/connect.html#csv-format
>>
>> ExecutionEnvironment env = ExecutionEnvironment.
>> getExecutionEnvironment();
>> BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
>> tEnv.withFormat(new Csv(...));
>>
>> > Exception in thread "main" java.lang.Error: Unresolved compilation
>> problem:
>>The method withFormat(Csv) is undefined for the type
>> BatchTableEnvironment
>>
>> Am I wrong?
>>
>> Thanks in advance for any hint
>>
>> François
>>
>


withFormat(Csv) is undefined for the type BatchTableEnvironment

2018-08-27 Thread françois lacombe
Hi all,

I'm currently trying to load a CSV file content with Flink 1.6.0 table API.
This error is raised as a try to execute the code written in docs
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#csv-format

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.withFormat(new Csv(...));

> Exception in thread "main" java.lang.Error: Unresolved compilation
problem:
   The method withFormat(Csv) is undefined for the type
BatchTableEnvironment

Am I wrong?

Thanks in advance for any hint

François


Re: AvroSchemaConverter and Tuple classes

2018-08-27 Thread françois lacombe
Thank you all for you answers.

It's ok with BatchTableSource


All the best

François

2018-08-26 17:40 GMT+02:00 Rong Rong :

> Yes you should be able to use Row instead of Tuple in your
> BatchTableSink.
> There's sections in Flink documentation regarding mapping of data types to
> table schemas [1]. and table can be converted into various typed DataStream
> [2] as well. Hope these are helpful.
>
> Thanks,
> Rong
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/table/common.html#mapping-of-data-types-to-table-schema
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/table/common.html#convert-a-table-into-a-
> datastream-or-dataset
>
>
>
> On Fri, Aug 24, 2018 at 8:04 AM françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi Timo,
>>
>> Thanks for your answer
>> I was looking for a Tuple as to feed a BatchTableSink subclass, but it
>> may be achived with a Row instead?
>>
>> All the best
>>
>> François
>>
>> 2018-08-24 10:21 GMT+02:00 Timo Walther :
>>
>>> Hi,
>>>
>>> tuples are just a sub category of rows. Because the tuple arity is
>>> limited to 25 fields. I think the easiest solution would be to write your
>>> own converter that maps rows to tuples if you know that you will not need
>>> more than 25 fields. Otherwise it might be easier to just use a
>>> TextInputFormat and do the parsing yourself with a library.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 23.08.18 um 18:54 schrieb françois lacombe:
>>>
>>> Hi all,
>>>>
>>>> I'm looking for best practices regarding Tuple instances creation.
>>>>
>>>> I have a TypeInformation object produced by AvroSchemaConverter.
>>>> convertToTypeInfo("{...}");
>>>> Is this possible to define a corresponding Tuple instance with it?
>>>> (get the T from the TypeInformation)
>>>>
>>>> Example :
>>>> {
>>>>   "type": "record",
>>>>   "fields": [
>>>> { "name": "field1", "type": "int" },
>>>> { "name": "field2", "type": "string"}
>>>> ]}
>>>>  = Tuple2
>>>>
>>>> The same question rises with DataSet or other any record handling class
>>>> with parametrized types.
>>>>
>>>> My goal is to parse several CsvFiles with different structures
>>>> described in an Avro schema.
>>>> It would be great to not hard-code structures in my Java code and only
>>>> get types information at runtime from Avro schemas
>>>>
>>>> Is this possible?
>>>>
>>>> Thanks in advance
>>>>
>>>> François Lacombe
>>>>
>>>
>>>
>>>
>>


Re: AvroSchemaConverter and Tuple classes

2018-08-24 Thread françois lacombe
Hi Timo,

Thanks for your answer
I was looking for a Tuple as to feed a BatchTableSink subclass, but it
may be achived with a Row instead?

All the best

François

2018-08-24 10:21 GMT+02:00 Timo Walther :

> Hi,
>
> tuples are just a sub category of rows. Because the tuple arity is limited
> to 25 fields. I think the easiest solution would be to write your own
> converter that maps rows to tuples if you know that you will not need more
> than 25 fields. Otherwise it might be easier to just use a TextInputFormat
> and do the parsing yourself with a library.
>
> Regards,
> Timo
>
>
> Am 23.08.18 um 18:54 schrieb françois lacombe:
>
> Hi all,
>>
>> I'm looking for best practices regarding Tuple instances creation.
>>
>> I have a TypeInformation object produced by AvroSchemaConverter.convertToT
>> ypeInfo("{...}");
>> Is this possible to define a corresponding Tuple instance with it?
>> (get the T from the TypeInformation)
>>
>> Example :
>> {
>>   "type": "record",
>>   "fields": [
>> { "name": "field1", "type": "int" },
>> { "name": "field2", "type": "string"}
>> ]}
>>  = Tuple2
>>
>> The same question rises with DataSet or other any record handling class
>> with parametrized types.
>>
>> My goal is to parse several CsvFiles with different structures described
>> in an Avro schema.
>> It would be great to not hard-code structures in my Java code and only
>> get types information at runtime from Avro schemas
>>
>> Is this possible?
>>
>> Thanks in advance
>>
>> François Lacombe
>>
>
>
>


AvroSchemaConverter and Tuple classes

2018-08-23 Thread françois lacombe
Hi all,

I'm looking for best practices regarding Tuple instances creation.

I have a TypeInformation object produced by
AvroSchemaConverter.convertToTypeInfo("{...}");
Is this possible to define a corresponding Tuple instance with it? (get
the T from the TypeInformation)

Example :
{
  "type": "record",
  "fields": [
{ "name": "field1", "type": "int" },
{ "name": "field2", "type": "string"}
]}
 = Tuple2

The same question rises with DataSet or other any record handling class
with parametrized types.

My goal is to parse several CsvFiles with different structures described in
an Avro schema.
It would be great to not hard-code structures in my Java code and only get
types information at runtime from Avro schemas

Is this possible?

Thanks in advance

François Lacombe


[1.6.0] Read content of Csv file with new table connectors

2018-08-13 Thread françois lacombe
Hi all,

Following the relase of Flink 1.6.0, I'm trying to setup a simple csv file
reader as to load its content in a sql table.
I'm using the new table connectors and this code recommended in Jira
FLINK-9813 and this doc :
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#connectors

Csv csv_format = new   Csv().schema(
TableSchema.fromTypeInfo(AvroSchemaConverter.convertToTypeInfo("My avro
schema text")));

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.withFormat(csv_format);
tEnv.connect(new FileSystem().path("my_local_file.csv"));
Table csv_table = tEnv.scan();

Is this right or I'm missing something?
I'm not sure format, file path etc should be put in TableEnvironment object.
Should I get as many TableEnvironment objects as I have different csv files?

Thanks in advance for your input, all the best

François Lacombe


Re: Filter columns of a csv file with Flink

2018-07-11 Thread françois lacombe
Ok Hequn,

I'll open 2 Jira for this issue, and maybe propose a draft of
CsvTableSource class handling avro schemas
FLINK-9813 and FLINK-9814

Thank you for your answers and best regards

François

2018-07-11 8:11 GMT+02:00 Hequn Cheng :

> Hi francois,
>
> > Is there any plan to give avro schemas a better role in Flink in
> further versions?
> Haven't heard about avro for csv. You can open a jira for it. Maybe also
> contribute to flink :-)
>
>
> On Tue, Jul 10, 2018 at 11:32 PM, françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi Hequn,
>>
>> 2018-07-10 3:47 GMT+02:00 Hequn Cheng :
>>
>>> Maybe I misunderstand you. So you don't want to skip the whole file?
>>>
>> Yes I do
>> By skipping the whole file I mean "throw an Exception to stop the process
>> and inform user that file is invalid for a given reason" and not "the
>> process goes fully right and import 0 rows"
>>
>>
>>> If does, then "extending CsvTableSource and provide the avro schema to
>>> the constructor without creating a custom AvroInputFormat" is ok.
>>>
>>
>> Then we agree on this
>> Is there any plan to give avro schemas a better role in Flink in further
>> versions?
>> Avro schemas are perfect to build CSVTableSource with code like
>>
>> for (Schema field_nfo : sch.getTypes()){
>>  // Test if csv file header actually contains a field corresponding
>> to schema
>>  if (!csv_headers.contains(field_nfo.getName())) {
>>   throw new NoSuchFieldException(field_nfo.getName());
>>  }
>>
>>  // Declare the field in the source Builder
>>  src_builder.field(field_nfo.getName(),
>> primitiveTypes.get(field_nfo.getType()));
>> }
>>
>> All the best
>>
>> François
>>
>>
>>
>>> On Mon, Jul 9, 2018 at 11:03 PM, françois lacombe <
>>> francois.laco...@dcbrain.com> wrote:
>>>
>>>> Hi Hequn,
>>>>
>>>> 2018-07-09 15:09 GMT+02:00 Hequn Cheng :
>>>>
>>>>> The first step requires an AvroInputFormat because the source needs
>>>>> AvroInputFormat to read avro data if data match schema.
>>>>>
>>>>
>>>> I don't want avro data, I just want to check if my csv file have the
>>>> same fields than defined in a given avro schema.
>>>> Processing should stop if and only if I find missing columns.
>>>>
>>>> A record which not match the schema (types mainly) should be rejected
>>>> and logged in a dedicated file but the processing can go on.
>>>>
>>>> How about extending CsvTableSource and provide the avro schema to the
>>>> constructor without creating a custom AvroInputFormat?
>>>>
>>>>
>>>> François
>>>>
>>>
>>>
>>
>


Re: Filter columns of a csv file with Flink

2018-07-10 Thread françois lacombe
Hi Hequn,

2018-07-10 3:47 GMT+02:00 Hequn Cheng :

> Maybe I misunderstand you. So you don't want to skip the whole file?
>
Yes I do
By skipping the whole file I mean "throw an Exception to stop the process
and inform user that file is invalid for a given reason" and not "the
process goes fully right and import 0 rows"


> If does, then "extending CsvTableSource and provide the avro schema to
> the constructor without creating a custom AvroInputFormat" is ok.
>

Then we agree on this
Is there any plan to give avro schemas a better role in Flink in further
versions?
Avro schemas are perfect to build CSVTableSource with code like

for (Schema field_nfo : sch.getTypes()){
 // Test if csv file header actually contains a field corresponding to
schema
 if (!csv_headers.contains(field_nfo.getName())) {
  throw new NoSuchFieldException(field_nfo.getName());
 }

 // Declare the field in the source Builder
 src_builder.field(field_nfo.getName(),
primitiveTypes.get(field_nfo.getType()));
}

All the best

François



> On Mon, Jul 9, 2018 at 11:03 PM, françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi Hequn,
>>
>> 2018-07-09 15:09 GMT+02:00 Hequn Cheng :
>>
>>> The first step requires an AvroInputFormat because the source needs
>>> AvroInputFormat to read avro data if data match schema.
>>>
>>
>> I don't want avro data, I just want to check if my csv file have the same
>> fields than defined in a given avro schema.
>> Processing should stop if and only if I find missing columns.
>>
>> A record which not match the schema (types mainly) should be rejected and
>> logged in a dedicated file but the processing can go on.
>>
>> How about extending CsvTableSource and provide the avro schema to the
>> constructor without creating a custom AvroInputFormat?
>>
>>
>> François
>>
>
>


Re: Filter columns of a csv file with Flink

2018-07-09 Thread françois lacombe
Hi Hequn,

As CsvTableSource sounds to be optimized for csv parsing I won't question
it too much.

Your second point sounds really better.
I can extend the CsvTableSource with extra Avro schema conflating
capabilities. Then if the csv file header doesn't match the avro schema
specification, then it throws an exception prior to parse the whole csv
Right ?

I plan to check the quality of data in two independent steps :
1 check the "file structure" with number of columns and their names, mainly
dealing with header row. Any error leads to whole file rejection with a
java exception
2 check each row with udfs to get ones that aren't consistent with avro
schema. Any error is logged, but the rest of the file is processed and
loaded.

I guess that first step doesn't require AvroInputFormat but a simple avro's
Schema object and the second would be more efficient with an
AvroInputFormat. Am I right ?

Thanks for useful inputs, all the best

François


2018-07-07 4:20 GMT+02:00 Hequn Cheng :

> Hi francois,
>
> > I see that CsvTableSource allows to define csv fields. Then, will it
> check if columns actually exists in the file and throw Exception if not ?
> Currently, CsvTableSource doesn't support Avro. CsvTableSource
> uses fieldDelim and rowDelim to parse data. But there is a workaround: read
> each line from data as a single big column, i.e., the source table only has
> one column. Afterward, you can use udtf[1] to split each line. You can
> throw away data or throw exceptions in udtf as you wish.
>
> >  I want to check if files structure is right before processing them.
> If you want to skip the whole file when the schema is erroneous. You can
> write a user defined table source and probably have to write a user defined
> InputFormat. You can refer to the AvroInputFormat[2] as an example.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/udfs.html#table-functions
> [2] https://github.com/apache/flink/blob/master/flink-
> formats/flink-avro/src/main/java/org/apache/flink/formats/
> avro/AvroInputFormat.java
>
> On Fri, Jul 6, 2018 at 11:32 PM, françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi Hequn,
>>
>> The Table-API is really great.
>> I will use and certainly love it to solve the issues I mentioned before
>>
>> One subsequent question regarding Table-API :
>> I've got my csv files and avro schemas that describe them.
>> As my users can send erroneous files, inconsistent with schemas, I want
>> to check if files structure is right before processing them.
>> I see that CsvTableSource allows to define csv fields. Then, will it
>> check if columns actually exists in the file and throw Exception if not ?
>>
>> Or is there any other way in Apache Avro to check if a csv file is
>> consistent with a given schema?
>>
>> Big thank to put on the table-api's way :)
>>
>> Best R
>>
>> François Lacombe
>>
>>
>>
>> 2018-07-06 16:53 GMT+02:00 Hequn Cheng :
>>
>>> Hi francois,
>>>
>>> If I understand correctly, you can use sql or table-api to solve you
>>> problem.
>>> As you want to project part of columns from source, a columnar storage
>>> like parquet/orc would be efficient. Currently, ORC table source is
>>> supported in flink, you can find more details here[1]. Also, there are many
>>> other table sources[2] you can choose. With a TableSource, you can read the
>>> data and register it as a Table and do table operations through sql[3] or
>>> table-api[4].
>>>
>>> To make a json string from several columns, you can write a user defined
>>> function[5].
>>>
>>> I also find a OrcTableSourceITCase[6] which I think may be helpful for
>>> you.
>>>
>>> Best, Hequn
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/d
>>> ev/table/sourceSinks.html#orctablesource
>>> [2] https://ci.apache.org/projects/flink/flink-docs-master/d
>>> ev/table/sourceSinks.html#table-sources-sinks
>>> [3] https://ci.apache.org/projects/flink/flink-docs-master/d
>>> ev/table/sql.html
>>> [4] https://ci.apache.org/projects/flink/flink-docs-master/d
>>> ev/table/tableApi.html
>>> [5] https://ci.apache.org/projects/flink/flink-docs-master/d
>>> ev/table/udfs.html
>>> [6] https://github.com/apache/flink/blob/master/flink-connec
>>> tors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSo
>>> urceITCase.java
>>>
>>>
>>> On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <
>>> francois.laco...@dcbra

Re: Filter columns of a csv file with Flink

2018-07-06 Thread françois lacombe
Hi Hequn,

The Table-API is really great.
I will use and certainly love it to solve the issues I mentioned before

One subsequent question regarding Table-API :
I've got my csv files and avro schemas that describe them.
As my users can send erroneous files, inconsistent with schemas, I want to
check if files structure is right before processing them.
I see that CsvTableSource allows to define csv fields. Then, will it check
if columns actually exists in the file and throw Exception if not ?

Or is there any other way in Apache Avro to check if a csv file is
consistent with a given schema?

Big thank to put on the table-api's way :)

Best R

François Lacombe



2018-07-06 16:53 GMT+02:00 Hequn Cheng :

> Hi francois,
>
> If I understand correctly, you can use sql or table-api to solve you
> problem.
> As you want to project part of columns from source, a columnar storage
> like parquet/orc would be efficient. Currently, ORC table source is
> supported in flink, you can find more details here[1]. Also, there are many
> other table sources[2] you can choose. With a TableSource, you can read the
> data and register it as a Table and do table operations through sql[3] or
> table-api[4].
>
> To make a json string from several columns, you can write a user defined
> function[5].
>
> I also find a OrcTableSourceITCase[6] which I think may be helpful for
> you.
>
> Best, Hequn
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sourceSinks.html#orctablesource
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sourceSinks.html#table-sources-sinks
> [3] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sql.html
> [4] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/tableApi.html
> [5] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/udfs.html
> [6] https://github.com/apache/flink/blob/master/flink-
> connectors/flink-orc/src/test/java/org/apache/flink/orc/
> OrcTableSourceITCase.java
>
>
> On Fri, Jul 6, 2018 at 9:48 PM, françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi all,
>>
>> I'm a new user to Flink community. This tool sounds great to achieve some
>> data loading of millions-rows files into a pgsql db for a new project.
>>
>> As I read docs and examples, a proper use case of csv loading into pgsql
>> can't be found.
>> The file I want to load isn't following the same structure than the
>> table, I have to delete some columns and make a json string from several
>> others too prior to load to pgsql
>>
>> I plan to use Flink 1.5 Java API and a batch process.
>> Does the DataSet class is able to strip some columns out of the records I
>> load or should I iterate over each record to delete the columns?
>>
>> Same question to make a json string from several columns of the same
>> record?
>> E.g json_column =3D {"field1":col1, "field2":col2...}
>>
>> I work with 20 millions length files and it sounds pretty ineffective to
>> iterate over each records.
>> Can someone tell me if it's possible or if I have to change my mind about
>> this?
>>
>>
>> Thanks in advance, all the best
>>
>> François Lacombe
>>
>>
>


Filter columns of a csv file with Flink

2018-07-06 Thread françois lacombe
Hi all,

I'm a new user to Flink community. This tool sounds great to achieve some
data loading of millions-rows files into a pgsql db for a new project.

As I read docs and examples, a proper use case of csv loading into pgsql
can't be found.
The file I want to load isn't following the same structure than the table,
I have to delete some columns and make a json string from several others
too prior to load to pgsql

I plan to use Flink 1.5 Java API and a batch process.
Does the DataSet class is able to strip some columns out of the records I
load or should I iterate over each record to delete the columns?

Same question to make a json string from several columns of the same record?
E.g json_column =3D {"field1":col1, "field2":col2...}

I work with 20 millions length files and it sounds pretty ineffective to
iterate over each records.
Can someone tell me if it's possible or if I have to change my mind about
this?


Thanks in advance, all the best

François Lacombe