[ 
https://issues.apache.org/jira/browse/FLINK-12584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Zhang closed FLINK-12584.
-----------------------------
    Resolution: Won't Fix

> Add Bucket File Syetem Table Sink
> ---------------------------------
>
>                 Key: FLINK-12584
>                 URL: https://issues.apache.org/jira/browse/FLINK-12584
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>    Affects Versions: 1.8.0, 1.9.0
>            Reporter: Jun Zhang
>            Assignee: Jun Zhang
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> 1. *Motivation*
> In flink, the file system (especially hdfs) is a very common output, but for 
> users using sql, it does not support directly using sql to write data to the 
> file system, so I want to add a bucket file system table sink, the user can 
> register it to StreamTableEnvironment, so that table api and sql api can 
> directly use the sink to write stream data to filesystem
> 2.*example*
> tEnv.connect(new Bucket().basePath("hdfs://localhost/tmp/flink-data"))
>                    .withFormat(new Json().deriveSchema())
>                    .withSchema(new Schema()
>                           .field("name", Types. STRING ())
>                           .field("age", Types. INT ())
>                    .inAppendMode()
>                    .registerTableSink("myhdfssink");
> tEnv.sqlUpdate("insert into myhdfssink SELECT * FROM mytablesource");
>  
>  3.*Some ideas to achieve this function*
> 1) Add a class Bucket which extends from ConnectorDescriptor, add some 
> properties, such as basePath.
> 2) Add a class BucketValidator which extends from the 
> ConnectorDescriptorValidator and is used to check the bucket descriptor.
> 3) Add a class FileSystemTableSink to implement the StreamTableSink 
> interface.  In the emitDataStream method, construct StreamingFileSink for 
> writing data to filesystem according to different properties.
> 4) Add a factory class FileSystemTableSinkFactory to implement the 
> StreamTableSinkFactory interface for constructing FileSystemTableSink
> 5) The parameters of withFormat method is the implementation classes of the 
> FormatDescriptor interface, such as Json, Csv, and we can add Parquet、Orc 
> later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to