+1 to renaming it as State Processing API and adding it under the flink-libraries module.
I also think we can start with the development of the feature. From the feedback so far, it seems like we're in a good spot to add in at least the initial version of this API, hopefully making it ready for 1.9.0. Cheers, Gordon On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sjwies...@gmail.com> wrote: > It seems like a recurring piece of feedback was a different name. I’d like > to propose moving the functionality to the libraries module and naming this > the State Processing API. > > Seth > > > On May 31, 2019, at 3:47 PM, Seth Wiesman <sjwies...@gmail.com> wrote: > > > > The SavepointOutputFormat only writes out the savepoint metadata file > and should be mostly ignored. > > > > The actual state is written out by stream operators and tied into the > flink runtime[1, 2, 3]. > > > > This is the most important part and the piece that I don’t think can be > reasonably extracted. > > > > Seth > > > > [1] > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84 > > > > [2] > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java > > > > [3] > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java > > > >> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote: > >> > >> Hi Seth, > >> > >> yes, that helped! :-) > >> > >> What I was looking for is essentially > `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It > would be great if this could be written in a way, that would enable reusing > it in different engine (as I mentioned - Apache Spark). There seem to be > some issues though. It uses interface Savepoint, which uses several other > objects and interfaces from Flink's runtime. Maybe some convenience API > might help - Apache Beam, handles operator naming, so that definitely > should be transitionable between systems, but I'm not sure, how to > construct OperatorID from this name. Would you think, that it is possible > to come up with something that could be used in this portable way? > >> > >> I understand, there are some more conditions, that need to be satisfied > (grouping, aggregating, ...), which would of course have to be handled by > the target system. But Apache Beam can help leverage that. My idea would > be, that there can be runner specified PTransform, that takes PCollection > of some tuples of `(operator name, key, state name, value1), (operator > name, key, state name, value2)`, and Runner's responsibility would be to > group/aggregate this so that it can be written by runner's provided writer > (output format). > >> > >> All of this would need a lot more design, these are just ideas of "what > could be possible", I was just wondering if this FLIP can make some first > steps towards this. > >> > >> Many thanks for comments, > >> > >> Jan > >> > >>> On 5/31/19 8:12 PM, Seth Wiesman wrote: > >>> @Jan Gotcha, > >>> > >>> So in reusing components it explicitly is not a writer. This is not a > savepoint output format in the way we have a parquet output format. The > reason for the Transform api is to hide the underlying details, it does not > simply append a output writer to the end of a dataset. This gets into the > implementation details but at a high level, the dataset is: > >>> > >>> 1) partitioned using key groups > >>> 2) data is run through a standard stream operator that takes a > snapshot of its state after processing all records and outputs metadata > handles for each subtask > >>> 3) those metadata handles are aggregated down to a single savepoint > handle > >>> 4) that handle is written out as a final metadata file > >>> > >>> What’s important here is that the api actually depends on the data > flow collection and state is written out as a side effect of taking a > savepoint. The FLIP describes a lose coupling to the dataset api for > eventual migration to BoundedStream, that is true. However, the api does > require knowing what concrete data flow is being used to perform these > re-partitionings and post aggregations. > >>> > >>> I’m linking to my prototype implementation, particularly what actually > happens when you call write and run the transformations. Let me know if > that helps clarify. > >>> > >>> Seth > >>> > >>> > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 > >>> > >>> > >>> > >>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote: > >>>> > >>>> Hi Seth, > >>>> > >>>> that sounds reasonable. What I was asking for was not to reverse > engineer binary format, but to make the savepoint write API a little more > reusable, so that it could be wrapped into some other technology. I don't > know the details enough to propose a solution, but it seems to me, that it > could be possible to use something like Writer instead of Transform. Or > maybe the Transform can use the Writer internally, the goal is just to > enable to create the savepoint from "'outside" of Flink (with some library, > of course). > >>>> > >>>> Jan > >>>> > >>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote: > >>>>> @Konstantin agreed, that was a large impotence for this feature. > Also I am happy to change the name to something that better describes the > feature set. > >>>>> > >>>>> @Lan > >>>>> > >>>>> Savepoints depend heavily on a number of flink internal components > that may change between versions: state backends internals, type > serializers, the specific hash function used to turn a UID into an > OperatorID, etc. I consider it a feature of this proposal that the library > depends on those internal components instead of reverse engineering the > binary format. This way as those internals change, or new state features > are added (think the recent addition of TTL) we will get support > automatically. I do not believe anything else is maintainable. > >>>>> > >>>>> Seth > >>>>> > >>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote: > >>>>>> > >>>>>> Hi, > >>>>>> > >>>>>> this is awesome, and really useful feature. If I might ask for one > thing to consider - would it be possible to make the Savepoint manipulation > API (at least writing the Savepoint) less dependent on other parts of Flink > internals (e.g. |KeyedStateBootstrapFunction|) and provide something more > general (e.g. some generic Writer)? Why I'm asking for that - I can totally > imagine situation, where users might want to create bootstrapped state by > some other runner (e.g. Apache Spark), and then run Apache Flink after the > state has been created. This makes even more sense in context of Apache > Beam, which provides all the necessary work to make this happen. The > question is - would it be possible to design this feature so that writing > the savepoint from different runner would be possible? > >>>>>> > >>>>>> Cheers, > >>>>>> > >>>>>> Jan > >>>>>> > >>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: > >>>>>>> Hey Everyone! > >>>>>>> > >>>>>>> Gordon and I have been discussing adding a savepoint connector to > flink for reading, writing and modifying savepoints. > >>>>>>> > >>>>>>> This is useful for: > >>>>>>> > >>>>>>> Analyzing state for interesting patterns > >>>>>>> Troubleshooting or auditing jobs by checking for discrepancies > in state > >>>>>>> Bootstrapping state for new applications > >>>>>>> Modifying savepoints such as: > >>>>>>> Changing max parallelism > >>>>>>> Making breaking schema changes > >>>>>>> Correcting invalid state > >>>>>>> > >>>>>>> We are looking forward to your feedback! > >>>>>>> > >>>>>>> This is the FLIP: > >>>>>>> > >>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector > >>>>>>> > >>>>>>> Seth > >>>>>>> > >>>>>>> > >>>>>>> >