Hi Jingsong , I am looking forward this feature. Because in some streaming application,it need transfer their messages to hdfs , in order to offline analysis.
Best wishes, LakeShen Stephan Ewen <se...@apache.org> 于2020年3月17日周二 下午7:42写道: > I would really like to see us converging the stack and the functionality > here. > Meaning to try and use the same sinks in the Table API as for the > DataStream API, and using the same sink for batch and streaming. > > The StreamingFileSink has a lot of things that can help with that. If > possible, it would be nice to extend it (which would help move towards the > above goal) rather than build a second sink. Building a second sink leads > us further away from unification. > > I am a bit puzzled by the statement that sinks are primarily for Hive. The > Table API should not be coupled to Hive, it should be an independent > batch/streaming API for many use cases, supporting very well for batch and > streaming interplay. Supporting Hive is great, but we should not be > building this towards Hive, as just yet another Hive runtime. Why "yet > another Hive runtime" when what we have a unique streaming engine that can > do much more? We would drop our own strength and reduce ourselves to a > limited subset. > > Let's build a File Sink that can also support Hive, but can do so much > more. For example, efficient streaming file ingestion as materialized views > from changelogs. > > > *## Writing Files in Streaming* > > To write files in streaming, I don't see another way than using the > streaming file sink. If you want to write files across checkpoints, support > exactly-once, and support consistent "stop with savepoint", it is not > trivial. > > A part of the complexity comes from the fact that not all targets are > actually file systems, and not all have simple semantics for persistence. > S3 for example does not support renames (only copies, which may take a lot > of time) and it does not support flush/sync of data (the S3 file system in > Hadoop exposes that but it does not work. flush/sync, followed by a > failure, leads to data loss). You need to devise a separate protocol for > that, which is exactly what has already been done and abstracted behind the > recoverable writers. > > If you re-engineer that in the, you will end up either missing many things > (intermediate persistence on different file systems, and atomic commit in > the absence of renames, etc.), or you end up doing something similar as the > recoverable writers do. > > > *## Atomic Commit in Batch* > > For batch sinks, it is also desirable to write the data first and then > atomically commit it once the job is done. > Hadoop has spent a lot of time making this work, see this doc here, > specifically the section on 'The "Magic" Committer'. [1] > > What Flink has built in the RecoverableWriter is in some way an even better > version of this, because it works without extra files (we pass data through > checkpoint state) and it supports not only committing once at the end, but > committing multiple time intermediate parts during checkpoints. > > Meaning using the recoverable writer mechanism in batch would allow us to > immediately get the efficient atomic commit implementations on file:// > hdfs:// and s3://, with a well defined way to implement it also for other > file systems. > > > *## Batch / Streaming Unification* > > It would be great to start looking at these things in the same way: > - streaming (exactly-once): commits files (after finished) at the next > checkpoint > - batch: single commit at the end of the job > > > *## DataStream / Table API Stack Unification* > > Having the same set of capabilities would make it much easier for users to > understand the system. > Especially when it comes to consistent behavior across external systems. > Having a different file sink in Table API and DataStream API means that > DataStream can write correctly to S3 while Table API cannot. > > > *## What is missing?* > > It seems there are some things that get in the way of naturally > Can you make a list of what features are missing in the StreamingFileSink > that make it usable for the use cases you have in mind? > > Best, > Stephan > > [1] > > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html > > > On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li <jingsongl...@gmail.com> > wrote: > > > Hi Piotr, > > > > I am very entangled. > > > > Let me re-list the table streaming sink requirements: > > - In table, maybe 90% sinks are for Hive. The parquet and orc are the > most > > important formats. Hive provide RecordWriters, it is easy to support all > > hive formats by using it, and we don't need concern hive version > > compatibility too, but it can not work with FSDataOutputStream. > > - Hive table maybe use external HDFS. It means, hive has its own hadoop > > configuration. > > - In table, partition commit is needed, we can not just move files, it is > > important to complete table semantics to update catalog. > > > > You are right DataStream and Table streaming sink will not be fully > > compatible, each with its own set of limitations, quirks and features. > > But if re-using DataStream, batch and streaming also will not be fully > > compatible. Provide a unify experience to batch and streaming is also > > important. > > > > Table and DataStream have different concerns, and they tilt in different > > directions. > > > > Of course, it is very good to see a unify implementation to solve batch > > sink and hive things, unify DataStream batch sink and DataStream > streaming > > sink and Table batch sink and Table streaming sink. > > > > Le's see what others think. > > > > Best, > > Jingsong Lee > > > > > > On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski <pi...@ververica.com> > > wrote: > > > > > Hi Jingsong, > > > > > > > First way is reusing Batch sink in FLINK-14254, It has handled the > > > partition and metastore logic well. > > > > - unify batch and streaming > > > > - Using FileOutputFormat is consistent with FileInputFormat. > > > > - Add exactly-once related logic. Just 200+ lines code. > > > > - It's natural to support more table features, like partition commit, > > > auto compact and etc.. > > > > > > > > Second way is reusing Datastream StreamingFileSink: > > > > - unify streaming sink between table and Datastream. > > > > - It maybe hard to introduce table related features to > > StreamingFileSink. > > > > > > > > I prefer the first way a little. What do you think? > > > > > > I would be surprised if adding “exactly-once related logic” is just 200 > > > lines of code. There are things like multi part file upload to s3 and > > there > > > are also some pending features like [1]. I would suggest to ask/involve > > > Klou in this discussion. > > > > > > If it’s as easy to support exactly-once streaming with current batch > > sink, > > > that begs the question, why do we need to maintain StreamingFileSink? > > > > > > The worst possible outcome from my perspective will be, if we have > > another > > > example of an operator/logic implemented independently both in > DataStream > > > API and Table API. Because I’m pretty sure they will not be fully > > > compatible, each with it’s own set of limitations, quirks and features. > > > Especially that we have on our long term roadmap and wish list to unify > > > such kind of operators. > > > > > > Piotrek > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11499 < > > > https://issues.apache.org/jira/browse/FLINK-11499> > > > > > > > On 16 Mar 2020, at 06:55, Jingsong Li <jingsongl...@gmail.com> > wrote: > > > > > > > > Thanks Jinhai for involving. > > > > > > > >> we need add 'connector.sink.username' for UserGroupInformation when > > data > > > > is written to HDFS > > > > > > > > Yes, I am not an expert of HDFS, but it seems we need do this "doAs" > in > > > the > > > > code for access external HDFS. I will update document. > > > > > > > > Best, > > > > Jingsong Lee > > > > > > > > On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li <jingsongl...@gmail.com > > > > > wrote: > > > > > > > >> Thanks Piotr and Yun for involving. > > > >> > > > >> Hi Piotr and Yun, for implementation, > > > >> > > > >> FLINK-14254 [1] introduce batch sink table world, it deals with > > > partitions > > > >> thing, metastore thing and etc.. And it just reuse > Dataset/Datastream > > > >> FileInputFormat and FileOutputFormat. Filesystem can not do without > > > >> FileInputFormat, because it need deal with file things, split > things. > > > Like > > > >> orc and parquet, they need read whole file and have different split > > > logic. > > > >> > > > >> So back to file system connector: > > > >> - It needs introducing FilesystemTableFactory, FilesystemTableSource > > and > > > >> FilesystemTableSink. > > > >> - For sources, reusing Dataset/Datastream FileInputFormats, there > are > > no > > > >> other interface to finish file reading. > > > >> > > > >> For file sinks: > > > >> - Batch sink use FLINK-14254 > > > >> - Streaming sink has two ways. > > > >> > > > >> First way is reusing Batch sink in FLINK-14254, It has handled the > > > >> partition and metastore logic well. > > > >> - unify batch and streaming > > > >> - Using FileOutputFormat is consistent with FileInputFormat. > > > >> - Add exactly-once related logic. Just 200+ lines code. > > > >> - It's natural to support more table features, like partition > commit, > > > auto > > > >> compact and etc.. > > > >> > > > >> Second way is reusing Datastream StreamingFileSink: > > > >> - unify streaming sink between table and Datastream. > > > >> - It maybe hard to introduce table related features to > > > StreamingFileSink. > > > >> > > > >> I prefer the first way a little. What do you think? > > > >> > > > >> Hi Yun, > > > >> > > > >>> Watermark mechanism might not be enough. > > > >> > > > >> Watermarks of subtasks are the same in the "snapshotState". > > > >> > > > >>> we might need to also do some coordination between subtasks. > > > >> > > > >> Yes, JobMaster is the role to control subtasks. Metastore is a very > > > >> fragile single point, which can not be accessed by distributed, so > it > > is > > > >> uniformly accessed by JobMaster. > > > >> > > > >> [1]https://issues.apache.org/jira/browse/FLINK-14254 > > > >> > > > >> Best, > > > >> Jingsong Lee > > > >> > > > >> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <yungao...@aliyun.com> > wrote: > > > >> > > > >>> Hi, > > > >>> > > > >>> Very thanks for Jinsong to bring up this discussion! It > should > > > >>> largely improve the usability after enhancing the FileSystem > > connector > > > in > > > >>> Table. > > > >>> > > > >>> I have the same question with Piotr. From my side, I think it > > > >>> should be better to be able to reuse existing StreamingFileSink. I > > > think We > > > >>> have began > > > >>> enhancing the supported FileFormat (e.g., ORC, Avro...), and > > > >>> reusing StreamFileSink should be able to avoid repeat work in the > > Table > > > >>> library. Besides, > > > >>> the bucket concept seems also matches the semantics of > > partition. > > > >>> > > > >>> For the notification of adding partitions, I'm a little > > wondering > > > >>> that the Watermark mechanism might not be enough since > > Bucket/Partition > > > >>> might spans > > > >>> multiple subtasks. It depends on the level of notification: > if > > we > > > >>> want to notify for the bucket on each subtask, using watermark to > > > notifying > > > >>> each subtask > > > >>> should be ok, but if we want to notifying for the whole > > > >>> Bucket/Partition, we might need to also do some coordination > between > > > >>> subtasks. > > > >>> > > > >>> > > > >>> Best, > > > >>> Yun > > > >>> > > > >>> > > > >>> > > > >>> ------------------------------------------------------------------ > > > >>> From:Piotr Nowojski <pi...@ververica.com> > > > >>> Send Time:2020 Mar. 13 (Fri.) 18:03 > > > >>> To:dev <dev@flink.apache.org> > > > >>> Cc:user <u...@flink.apache.org>; user-zh <user...@flink.apache.org > > > > > >>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table > > > >>> > > > >>> Hi, > > > >>> > > > >>> > > > >>> Which actual sinks/sources are you planning to use in this feature? > > Is > > > it about exposing StreamingFileSink in the Table API? Or do you want to > > > implement new Sinks/Sources? > > > >>> > > > >>> Piotrek > > > >>> > > > >>>> On 13 Mar 2020, at 10:04, jinhai wang <jinhai...@gmail.com> > wrote: > > > >>>> > > > >>> > > > >>>> Thanks for FLIP-115. It is really useful feature for platform > > > developers who manage hundreds of Flink to Hive jobs in production. > > > >>> > > > >>>> I think we need add 'connector.sink.username' for > > > UserGroupInformation when data is written to HDFS > > > >>>> > > > >>>> > > > >>>> 在 2020/3/13 下午3:33,“Jingsong Li”<jingsongl...@gmail.com> 写入: > > > >>>> > > > >>>> Hi everyone, > > > >>>> > > > >>> > > > >>>> I'd like to start a discussion about FLIP-115 Filesystem > connector > > > in Table > > > >>>> [1]. > > > >>>> This FLIP will bring: > > > >>>> - Introduce Filesystem table factory in table, support > > > >>>> csv/parquet/orc/json/avro formats. > > > >>>> - Introduce streaming filesystem/hive sink in table > > > >>>> > > > >>> > > > >>>> CC to user mail list, if you have any unmet needs, please feel > > free > > > to > > > >>>> reply~ > > > >>>> > > > >>>> Look forward to hearing from you. > > > >>>> > > > >>>> [1] > > > >>>> > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > > >>>> > > > >>>> Best, > > > >>>> Jingsong Lee > > > >>>> > > > >>>> > > > >>>> > > > >>> > > > >>> > > > >>> > > > >> > > > >> -- > > > >> Best, Jingsong Lee > > > >> > > > > > > > > > > > > -- > > > > Best, Jingsong Lee > > > > > > > > > > -- > > Best, Jingsong Lee > > >