Hi Jingsong ,

I am looking forward this feature. Because in some streaming application,it
need transfer their messages to hdfs , in order to offline analysis.

Best wishes,
LakeShen

Stephan Ewen <se...@apache.org> 于2020年3月17日周二 下午7:42写道:

> I would really like to see us converging the stack and the functionality
> here.
> Meaning to try and use the same sinks in the Table API as for the
> DataStream API, and using the same sink for batch and streaming.
>
> The StreamingFileSink has a lot of things that can help with that. If
> possible, it would be nice to extend it (which would help move towards the
> above goal) rather than build a second sink. Building a second sink leads
> us further away from unification.
>
> I am a bit puzzled by the statement that sinks are primarily for Hive. The
> Table API should not be coupled to Hive, it should be an independent
> batch/streaming API for many use cases, supporting very well for batch and
> streaming interplay. Supporting Hive is great, but we should not be
> building this towards Hive, as just yet another Hive runtime. Why "yet
> another Hive runtime" when what we have a unique streaming engine that can
> do much more? We would drop our own strength and reduce ourselves to a
> limited subset.
>
> Let's build a File Sink that can also support Hive, but can do so much
> more. For example, efficient streaming file ingestion as materialized views
> from changelogs.
>
>
> *## Writing Files in Streaming*
>
> To write files in streaming, I don't see another way than using the
> streaming file sink. If you want to write files across checkpoints, support
> exactly-once, and support consistent "stop with savepoint", it is not
> trivial.
>
> A part of the complexity comes from the fact that not all targets are
> actually file systems, and not all have simple semantics for persistence.
> S3 for example does not support renames (only copies, which may take a lot
> of time) and it does not support flush/sync of data (the S3 file system in
> Hadoop exposes that but it does not work. flush/sync, followed by a
> failure, leads to data loss). You need to devise a separate protocol for
> that, which is exactly what has already been done and abstracted behind the
> recoverable writers.
>
> If you re-engineer that in the, you will end up either missing many things
> (intermediate persistence on different file systems, and atomic commit in
> the absence of renames, etc.), or you end up doing something similar as the
> recoverable writers do.
>
>
> *## Atomic Commit in Batch*
>
> For batch sinks, it is also desirable to write the data first and then
> atomically commit it once the job is done.
> Hadoop has spent a lot of time making this work, see this doc here,
> specifically the section on 'The "Magic" Committer'. [1]
>
> What Flink has built in the RecoverableWriter is in some way an even better
> version of this, because it works without extra files (we pass data through
> checkpoint state) and it supports not only committing once at the end, but
> committing multiple time intermediate parts during checkpoints.
>
> Meaning using the recoverable writer mechanism in batch would allow us to
> immediately get the efficient atomic commit implementations on file://
> hdfs:// and s3://, with a well defined way to implement it also for other
> file systems.
>
>
> *## Batch / Streaming Unification*
>
> It would be great to start looking at these things in the same way:
>   - streaming (exactly-once): commits files (after finished) at the next
> checkpoint
>   - batch: single commit at the end of the job
>
>
> *## DataStream / Table API Stack Unification*
>
> Having the same set of capabilities would make it much easier for users to
> understand the system.
> Especially when it comes to consistent behavior across external systems.
> Having a different file sink in Table API and DataStream API means that
> DataStream can write correctly to S3 while Table API cannot.
>
>
> *## What is missing?*
>
> It seems there are some things that get in the way of naturally
> Can you make a list of what features are missing in the StreamingFileSink
> that make it usable for the use cases you have in mind?
>
> Best,
> Stephan
>
> [1]
>
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html
>
>
> On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
> > Hi Piotr,
> >
> > I am very entangled.
> >
> > Let me re-list the table streaming sink requirements:
> > - In table, maybe 90% sinks are for Hive. The parquet and orc are the
> most
> > important formats. Hive provide RecordWriters, it is easy to support all
> > hive formats by using it, and we don't need concern hive version
> > compatibility too, but it can not work with FSDataOutputStream.
> > - Hive table maybe use external HDFS. It means, hive has its own hadoop
> > configuration.
> > - In table, partition commit is needed, we can not just move files, it is
> > important to complete table semantics to update catalog.
> >
> > You are right DataStream and Table streaming sink will not be fully
> > compatible, each with its own set of limitations, quirks and features.
> > But if re-using DataStream, batch and streaming also will not be fully
> > compatible. Provide a unify experience to batch and streaming is also
> > important.
> >
> > Table and DataStream have different concerns, and they tilt in different
> > directions.
> >
> > Of course, it is very good to see a unify implementation to solve batch
> > sink and hive things, unify DataStream batch sink and DataStream
> streaming
> > sink and Table batch sink and Table streaming sink.
> >
> > Le's see what others think.
> >
> > Best,
> > Jingsong Lee
> >
> >
> > On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski <pi...@ververica.com>
> > wrote:
> >
> > > Hi Jingsong,
> > >
> > > > First way is reusing Batch sink in FLINK-14254, It has handled the
> > > partition and metastore logic well.
> > > > - unify batch and streaming
> > > > - Using FileOutputFormat is consistent with FileInputFormat.
> > > > - Add exactly-once related logic. Just 200+ lines code.
> > > > - It's natural to support more table features, like partition commit,
> > > auto compact and etc..
> > > >
> > > > Second way is reusing Datastream StreamingFileSink:
> > > > - unify streaming sink between table and Datastream.
> > > > - It maybe hard to introduce table related features to
> > StreamingFileSink.
> > > >
> > > > I prefer the first way a little. What do you think?
> > >
> > > I would be surprised if adding “exactly-once related logic” is just 200
> > > lines of code. There are things like multi part file upload to s3 and
> > there
> > > are also some pending features like [1]. I would suggest to ask/involve
> > > Klou in this discussion.
> > >
> > > If it’s as easy to support exactly-once streaming with current batch
> > sink,
> > > that begs the question, why do we need to maintain StreamingFileSink?
> > >
> > > The worst possible outcome from my perspective will be, if we have
> > another
> > > example of an operator/logic implemented independently both in
> DataStream
> > > API and Table API. Because I’m pretty sure they will not be fully
> > > compatible, each with it’s own set of limitations, quirks and features.
> > > Especially that we have on our long term roadmap and wish list to unify
> > > such kind of operators.
> > >
> > > Piotrek
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-11499 <
> > > https://issues.apache.org/jira/browse/FLINK-11499>
> > >
> > > > On 16 Mar 2020, at 06:55, Jingsong Li <jingsongl...@gmail.com>
> wrote:
> > > >
> > > > Thanks Jinhai for involving.
> > > >
> > > >> we need add 'connector.sink.username' for UserGroupInformation when
> > data
> > > > is written to HDFS
> > > >
> > > > Yes, I am not an expert of HDFS, but it seems we need do this "doAs"
> in
> > > the
> > > > code for access external HDFS. I will update document.
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > > > On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li <jingsongl...@gmail.com
> >
> > > wrote:
> > > >
> > > >> Thanks Piotr and Yun for involving.
> > > >>
> > > >> Hi Piotr and Yun, for implementation,
> > > >>
> > > >> FLINK-14254 [1] introduce batch sink table world, it deals with
> > > partitions
> > > >> thing, metastore thing and etc.. And it just reuse
> Dataset/Datastream
> > > >> FileInputFormat and FileOutputFormat. Filesystem can not do without
> > > >> FileInputFormat, because it need deal with file things, split
> things.
> > > Like
> > > >> orc and parquet, they need read whole file and have different split
> > > logic.
> > > >>
> > > >> So back to file system connector:
> > > >> - It needs introducing FilesystemTableFactory, FilesystemTableSource
> > and
> > > >> FilesystemTableSink.
> > > >> - For sources, reusing Dataset/Datastream FileInputFormats, there
> are
> > no
> > > >> other interface to finish file reading.
> > > >>
> > > >> For file sinks:
> > > >> - Batch sink use FLINK-14254
> > > >> - Streaming sink has two ways.
> > > >>
> > > >> First way is reusing Batch sink in FLINK-14254, It has handled the
> > > >> partition and metastore logic well.
> > > >> - unify batch and streaming
> > > >> - Using FileOutputFormat is consistent with FileInputFormat.
> > > >> - Add exactly-once related logic. Just 200+ lines code.
> > > >> - It's natural to support more table features, like partition
> commit,
> > > auto
> > > >> compact and etc..
> > > >>
> > > >> Second way is reusing Datastream StreamingFileSink:
> > > >> - unify streaming sink between table and Datastream.
> > > >> - It maybe hard to introduce table related features to
> > > StreamingFileSink.
> > > >>
> > > >> I prefer the first way a little. What do you think?
> > > >>
> > > >> Hi Yun,
> > > >>
> > > >>> Watermark mechanism might not be enough.
> > > >>
> > > >> Watermarks of subtasks are the same in the "snapshotState".
> > > >>
> > > >>> we might need to also do some coordination between subtasks.
> > > >>
> > > >> Yes, JobMaster is the role to control subtasks. Metastore is a very
> > > >> fragile single point, which can not be accessed by distributed, so
> it
> > is
> > > >> uniformly accessed by JobMaster.
> > > >>
> > > >> [1]https://issues.apache.org/jira/browse/FLINK-14254
> > > >>
> > > >> Best,
> > > >> Jingsong Lee
> > > >>
> > > >> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <yungao...@aliyun.com>
> wrote:
> > > >>
> > > >>>       Hi,
> > > >>>
> > > >>>       Very thanks for Jinsong to bring up this discussion! It
> should
> > > >>> largely improve the usability after enhancing the FileSystem
> > connector
> > > in
> > > >>> Table.
> > > >>>
> > > >>>       I have the same question with Piotr. From my side, I think it
> > > >>> should be better to be able to reuse existing StreamingFileSink. I
> > > think We
> > > >>> have began
> > > >>>       enhancing the supported FileFormat (e.g., ORC, Avro...), and
> > > >>> reusing StreamFileSink should be able to avoid repeat work in the
> > Table
> > > >>> library. Besides,
> > > >>>       the bucket concept seems also matches the semantics of
> > partition.
> > > >>>
> > > >>>       For the notification of adding partitions, I'm a little
> > wondering
> > > >>> that the Watermark mechanism might not be enough since
> > Bucket/Partition
> > > >>> might spans
> > > >>>       multiple subtasks. It depends on the level of notification:
> if
> > we
> > > >>> want to notify for the bucket on each subtask, using watermark to
> > > notifying
> > > >>> each subtask
> > > >>>       should be ok, but if we want to notifying for the whole
> > > >>> Bucket/Partition, we might need to also do some coordination
> between
> > > >>> subtasks.
> > > >>>
> > > >>>
> > > >>>     Best,
> > > >>>      Yun
> > > >>>
> > > >>>
> > > >>>
> > > >>> ------------------------------------------------------------------
> > > >>> From:Piotr Nowojski <pi...@ververica.com>
> > > >>> Send Time:2020 Mar. 13 (Fri.) 18:03
> > > >>> To:dev <dev@flink.apache.org>
> > > >>> Cc:user <u...@flink.apache.org>; user-zh <user...@flink.apache.org
> >
> > > >>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table
> > > >>>
> > > >>> Hi,
> > > >>>
> > > >>>
> > > >>> Which actual sinks/sources are you planning to use in this feature?
> > Is
> > > it about exposing StreamingFileSink in the Table API? Or do you want to
> > > implement new Sinks/Sources?
> > > >>>
> > > >>> Piotrek
> > > >>>
> > > >>>> On 13 Mar 2020, at 10:04, jinhai wang <jinhai...@gmail.com>
> wrote:
> > > >>>>
> > > >>>
> > > >>>> Thanks for FLIP-115. It is really useful feature for platform
> > > developers who manage hundreds of Flink to Hive jobs in production.
> > > >>>
> > > >>>> I think we need add 'connector.sink.username' for
> > > UserGroupInformation when data is written to HDFS
> > > >>>>
> > > >>>>
> > > >>>> 在 2020/3/13 下午3:33,“Jingsong Li”<jingsongl...@gmail.com> 写入:
> > > >>>>
> > > >>>>   Hi everyone,
> > > >>>>
> > > >>>
> > > >>>>   I'd like to start a discussion about FLIP-115 Filesystem
> connector
> > > in Table
> > > >>>>   [1].
> > > >>>>   This FLIP will bring:
> > > >>>>   - Introduce Filesystem table factory in table, support
> > > >>>>   csv/parquet/orc/json/avro formats.
> > > >>>>   - Introduce streaming filesystem/hive sink in table
> > > >>>>
> > > >>>
> > > >>>>   CC to user mail list, if you have any unmet needs, please feel
> > free
> > > to
> > > >>>>   reply~
> > > >>>>
> > > >>>>   Look forward to hearing from you.
> > > >>>>
> > > >>>>   [1]
> > > >>>>
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
> > > >>>>
> > > >>>>   Best,
> > > >>>>   Jingsong Lee
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>>
> > > >>
> > > >> --
> > > >> Best, Jingsong Lee
> > > >>
> > > >
> > > >
> > > > --
> > > > Best, Jingsong Lee
> > >
> > >
> >
> > --
> > Best, Jingsong Lee
> >
>

Reply via email to