zhangjun created FLINK-12584:
--------------------------------

             Summary: Add Bucket File Syetem Table Sink
                 Key: FLINK-12584
                 URL: https://issues.apache.org/jira/browse/FLINK-12584
             Project: Flink
          Issue Type: New Feature
          Components: Table SQL / API
    Affects Versions: 1.8.0, 1.9.0
            Reporter: zhangjun
            Assignee: zhangjun


#  ** *Motivation*

In flink, the file system (especially hdfs) is a very common output, but for 
users using sql, it does not support directly using sql to write data to the 
file system, so I want to add a bucket file system table sink, the user can 
register it to StreamTableEnvironment, so that table api and sql api can 
directly use the sink to write stream data to filesystem
 #  ** *example*

tEnv.connect(new Bucket().basePath("hdfs://localhost/tmp/flink-data"))

                   .withFormat(new Json().deriveSchema())

                   .withSchema(new Schema()

                          .field("name", Types. STRING ())

                          .field("age", Types. INT ())

                   .inAppendMode()

                   .registerTableSink("myhdfssink");

tEnv.sqlUpdate("insert into myhdfssink SELECT * FROM mytablesource");

 
 #  ** *Some ideas to achieve this function*

1) Add a class Bucket which extends from ConnectorDescriptor, add some 
properties, such as basePath.

2) Add a class BucketValidator which extends from the 
ConnectorDescriptorValidator and is used to check the bucket descriptor.

3) Add a class FileSystemTableSink to implement the StreamTableSink interface.  
In the emitDataStream method, construct StreamingFileSink for writing data to 
filesystem according to different properties.

4) Add a factory class FileSystemTableSinkFactory to implement the 
StreamTableSinkFactory interface for constructing FileSystemTableSink

5) The parameters of withFormat method is the implementation classes of the 
FormatDescriptor interface, such as Json, Csv, and we can add Parquet、Orc later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to