On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang <xiaow...@gmail.com> wrote:

>  Hi Gordon & Seth, this looks like a very useful feature for analyze and
> manage states.
> I agree that using DataSet is probably the most practical choice right
> now. But in the longer adding the TableAPI support for this will be nice.
>

Agreed. Migrating this API in the future to the TableAPI is definitely
something we have considered.


> When analyzing the savepoint, I assume that the state backend restores the
> state first? This approach is generic and works for any state backend.


Yes, that is correct. The process of reading state in snapshots is
currently:
1) the metadata file is read when creating the input splits for the
InputFormat. Each input split is assigned operator state and key-group
state handles.
2) For each input split, a state backend is launched and is restored with
all state of the assigned state handles. Only partially some state is
transformed into DataSets (using the savepoint.read*State(...) methods).


> However, sometimes it may be more efficient to directly analyzing the
> files on DFS without copying. We can probably add interface to allow state
> backend optimize such behavior in the future.


That sounds like an interesting direction, though at the moment it may only
make sense for full savepoints / checkpoints.
One blocker for enabling this, is having the type information of state
available in the snapshot metadata file so that schema / type of state is
known before actually reading state.
Making state schema / type information available in the metadata file is
already a recurring discussion in this thread that would be useful for not
only this feature you mentioned, but also for features like SQL integration
in the future.
Therefore, this seems to be a reasonable next step when extending on top of
the initial scope of the API proposed in the FLIP.


> Also a quick question on the example in wiki: DataSet<MyPojo> keyedState =
> operator.readKeyedState("uid", new ReaderFunction());Should
> operator.readKeyedState  be replaced with savepoint.readKeyedState here?
>

Correct, this is indeed a typo. I've corrected this in the FLIP.

Cheers,
Gordon


>
> Regards,Xiaowei
>
>     On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek <
> aljos...@apache.org> wrote:
>
>  +1 I think is is a very valuable new additional and we should try and not
> get stuck on trying to design the perfect solution for everything
>
> > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
> >
> > +1 to renaming it as State Processing API and adding it under the
> > flink-libraries module.
> >
> > I also think we can start with the development of the feature. From the
> > feedback so far, it seems like we're in a good spot to add in at least
> the
> > initial version of this API, hopefully making it ready for 1.9.0.
> >
> > Cheers,
> > Gordon
> >
> > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sjwies...@gmail.com> wrote:
> >
> >> It seems like a recurring piece of feedback was a different name. I’d
> like
> >> to propose moving the functionality to the libraries module and naming
> this
> >> the State Processing API.
> >>
> >> Seth
> >>
> >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com> wrote:
> >>>
> >>> The SavepointOutputFormat only writes out the savepoint metadata file
> >> and should be mostly ignored.
> >>>
> >>> The actual state is written out by stream operators and tied into the
> >> flink runtime[1, 2, 3].
> >>>
> >>> This is the most important part and the piece that I don’t think can be
> >> reasonably extracted.
> >>>
> >>> Seth
> >>>
> >>> [1]
> >>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> >>>
> >>> [2]
> >>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> >>>
> >>> [3]
> >>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> >>>
> >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
> >>>>
> >>>> Hi Seth,
> >>>>
> >>>> yes, that helped! :-)
> >>>>
> >>>> What I was looking for is essentially
> >> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
> >> would be great if this could be written in a way, that would enable
> reusing
> >> it in different engine (as I mentioned - Apache Spark). There seem to be
> >> some issues though. It uses interface Savepoint, which uses several
> other
> >> objects and interfaces from Flink's runtime. Maybe some convenience API
> >> might help - Apache Beam, handles operator naming, so that definitely
> >> should be transitionable between systems, but I'm not sure, how to
> >> construct OperatorID from this name. Would you think, that it is
> possible
> >> to come up with something that could be used in this portable way?
> >>>>
> >>>> I understand, there are some more conditions, that need to be
> satisfied
> >> (grouping, aggregating, ...), which would of course have to be handled
> by
> >> the target system. But Apache Beam can help leverage that. My idea would
> >> be, that there can be runner specified PTransform, that takes
> PCollection
> >> of some tuples of `(operator name, key, state name, value1), (operator
> >> name, key, state name, value2)`, and Runner's responsibility would be to
> >> group/aggregate this so that it can be written by runner's provided
> writer
> >> (output format).
> >>>>
> >>>> All of this would need a lot more design, these are just ideas of
> "what
> >> could be possible", I was just wondering if this FLIP can make some
> first
> >> steps towards this.
> >>>>
> >>>> Many thanks for comments,
> >>>>
> >>>> Jan
> >>>>
> >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> >>>>> @Jan Gotcha,
> >>>>>
> >>>>> So in reusing components it explicitly is not a writer. This is not a
> >> savepoint output format in the way we have a parquet output format. The
> >> reason for the Transform api is to hide the underlying details, it does
> not
> >> simply append a output writer to the end of a dataset. This gets into
> the
> >> implementation details but at a high level, the dataset is:
> >>>>>
> >>>>> 1) partitioned using key groups
> >>>>> 2) data is run through a standard stream operator that takes a
> >> snapshot of its state after processing all records and outputs metadata
> >> handles for each subtask
> >>>>> 3) those metadata handles are aggregated down to a single savepoint
> >> handle
> >>>>> 4) that handle is written out as a final metadata file
> >>>>>
> >>>>> What’s important here is that the api actually depends on the data
> >> flow collection and state is written out as a side effect of taking a
> >> savepoint. The FLIP describes a lose coupling to the dataset api for
> >> eventual migration to BoundedStream, that is true. However, the api does
> >> require knowing what concrete data flow is being used to perform these
> >> re-partitionings  and post aggregations.
> >>>>>
> >>>>> I’m linking to my prototype implementation, particularly what
> actually
> >> happens when you call write and run the transformations. Let me know if
> >> that helps clarify.
> >>>>>
> >>>>> Seth
> >>>>>
> >>>>>
> >>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
> >>>>>
> >>>>>
> >>>>>
> >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote:
> >>>>>>
> >>>>>> Hi Seth,
> >>>>>>
> >>>>>> that sounds reasonable. What I was asking for was not to reverse
> >> engineer binary format, but to make the savepoint write API a little
> more
> >> reusable, so that it could be wrapped into some other technology. I
> don't
> >> know the details enough to propose a solution, but it seems to me, that
> it
> >> could be possible to use something like Writer instead of Transform. Or
> >> maybe the Transform can use the Writer internally, the goal is just to
> >> enable to create the savepoint from "'outside" of Flink (with some
> library,
> >> of course).
> >>>>>>
> >>>>>> Jan
> >>>>>>
> >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
> >>>>>>> @Konstantin agreed, that was a large impotence for this feature.
> >> Also I am happy to change the name to something that better  describes
> the
> >> feature set.
> >>>>>>>
> >>>>>>> @Lan
> >>>>>>>
> >>>>>>> Savepoints depend heavily on a number of flink internal components
> >> that may change between versions: state backends internals, type
> >> serializers, the specific hash function used to turn a UID into an
> >> OperatorID, etc. I consider it a feature of this proposal that the
> library
> >> depends on those internal components instead of reverse engineering the
> >> binary format. This way as those internals change, or new state features
> >> are added (think the recent addition of TTL) we will get support
> >> automatically. I do not believe anything else is maintainable.
> >>>>>>>
> >>>>>>> Seth
> >>>>>>>
> >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz>
> wrote:
> >>>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> this is awesome, and really useful feature. If I might ask for one
> >> thing to consider - would it be possible to make the Savepoint
> manipulation
> >> API (at least writing the Savepoint) less dependent on other parts of
> Flink
> >> internals (e.g. |KeyedStateBootstrapFunction|) and provide something
> more
> >> general (e.g. some generic Writer)? Why I'm asking for that - I can
> totally
> >> imagine situation, where users might want to create bootstrapped state
> by
> >> some other runner (e.g. Apache Spark), and then run Apache Flink after
> the
> >> state has been created. This makes even more sense in context of Apache
> >> Beam, which provides all the necessary work to make this happen. The
> >> question is - would it be possible to design this feature so that
> writing
> >> the savepoint from different runner would be possible?
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>>
> >>>>>>>> Jan
> >>>>>>>>
> >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
> >>>>>>>>> Hey Everyone!
> >>>>>>>>>
> >>>>>>>>> Gordon and I have been discussing adding a savepoint connector to
> >> flink for reading, writing and modifying savepoints.
> >>>>>>>>>
> >>>>>>>>> This is useful for:
> >>>>>>>>>
> >>>>>>>>>  Analyzing state for interesting patterns
> >>>>>>>>>  Troubleshooting or auditing jobs by checking for discrepancies
> >> in state
> >>>>>>>>>  Bootstrapping state for new applications
> >>>>>>>>>  Modifying savepoints such as:
> >>>>>>>>>      Changing max parallelism
> >>>>>>>>>      Making breaking schema changes
> >>>>>>>>>      Correcting invalid state
> >>>>>>>>>
> >>>>>>>>> We are looking forward to your feedback!
> >>>>>>>>>
> >>>>>>>>> This is the FLIP:
> >>>>>>>>>
> >>>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
> >>>>>>>>>
> >>>>>>>>> Seth
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>
>

Reply via email to