[ https://issues.apache.org/jira/browse/FLINK-12584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jun Zhang closed FLINK-12584. ----------------------------- Resolution: Won't Fix > Add Bucket File Syetem Table Sink > --------------------------------- > > Key: FLINK-12584 > URL: https://issues.apache.org/jira/browse/FLINK-12584 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API > Affects Versions: 1.8.0, 1.9.0 > Reporter: Jun Zhang > Assignee: Jun Zhang > Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > 1. *Motivation* > In flink, the file system (especially hdfs) is a very common output, but for > users using sql, it does not support directly using sql to write data to the > file system, so I want to add a bucket file system table sink, the user can > register it to StreamTableEnvironment, so that table api and sql api can > directly use the sink to write stream data to filesystem > 2.*example* > tEnv.connect(new Bucket().basePath("hdfs://localhost/tmp/flink-data")) > .withFormat(new Json().deriveSchema()) > .withSchema(new Schema() > .field("name", Types. STRING ()) > .field("age", Types. INT ()) > .inAppendMode() > .registerTableSink("myhdfssink"); > tEnv.sqlUpdate("insert into myhdfssink SELECT * FROM mytablesource"); > > 3.*Some ideas to achieve this function* > 1) Add a class Bucket which extends from ConnectorDescriptor, add some > properties, such as basePath. > 2) Add a class BucketValidator which extends from the > ConnectorDescriptorValidator and is used to check the bucket descriptor. > 3) Add a class FileSystemTableSink to implement the StreamTableSink > interface. In the emitDataStream method, construct StreamingFileSink for > writing data to filesystem according to different properties. > 4) Add a factory class FileSystemTableSinkFactory to implement the > StreamTableSinkFactory interface for constructing FileSystemTableSink > 5) The parameters of withFormat method is the implementation classes of the > FormatDescriptor interface, such as Json, Csv, and we can add Parquet、Orc > later. -- This message was sent by Atlassian Jira (v8.3.4#803005)