Hi Robert, Boris, Sorry for the late reply. I took some time to look at the implementation. I think it looks good overall to me. Since this is pretty much a big PR. I would like to raise a bit of discussion beforehand:
1. In order to simplify the PR complexity, can we do the following? - the example in a separated PR - either the Java or Scala implementation of the model-serving module into a separated PR (they look pretty much similar to me) This way it makes the PR more clean and readable. 2. I think the idea to carve out this implementation into: - model layer; - serving layer; - queryable state API is a very good idea! On a higher level, this should enable usage of any model in serving perspective, so the question is: does DataConverter / Model conformed with Ski-learn's pipeline definition (My read on this is: yes). My understanding is this is generic enough to support any "Model", for example the model defined in FLIP-39, but it might be a good idea for the authors to also take a look (CCed: @weihua, @shaoxuan) 3. Implementation - following up with #1, can we move the architecture to - model-server-base - model-server-java/scala //specific to each language There are many duplicate codes between the Java and Scala implementations that I think might be able to put them as a base, for example, implemented in Scala (since most of the basic math / ML packages[1][2] are all implemented Scala-based). - Type system of this PR does not use too much of Flink's type information (other than the ByteArray, which is a pass-through), maybe we can leverage this for the strong-type guarantee on the data processor side. (CCed @timo who might have some idea regarding the type system in a whole, as he is working on the table type system FLIP-37). My thoughts is, the model itself is protobuf thus must be in ByteArray, but some auxiliary data (stats, additional data type, record type, etc) can use Flink's type system. I will finish the review and add comments to the PR asap. Thanks, Rong [1] https://github.com/scalanlp/breeze [2] https://spark.apache.org/docs/latest/ml-guide.html On Tue, Apr 30, 2019 at 2:33 AM Robert Metzger <rmetz...@apache.org> wrote: > Hey all, > > I'm wondering if somebody on the list can take a look at the PR from > FLIP-23: https://github.com/apache/flink/pull/7446 > > > On Mon, Oct 1, 2018 at 6:13 PM Rong Rong <walter...@gmail.com> wrote: > > > Thanks for the contribution Boris!! I've been playing around with the > basic > > model for a while back and loved it. > > +1 and really looking forward to having the feature merging back to Flink > > ML. > > > > -- > > Rong > > > > On Mon, Oct 1, 2018 at 7:55 AM Fabian Hueske <fhue...@gmail.com> wrote: > > > > > Hi everybody, > > > > > > The question of how to serve ML models in Flink applications came up in > > > several conversations I had with Flink users in the last months. > > > Recently, Boris approached me and he told me that he'd like to revive > the > > > efforts around FLIP-23 [1]. > > > > > > In the last days, Boris extended the proposal by a speculative model > > > evaluation which allows for evaluating multiple modes of varying > > complexity > > > to ensure certain SLAs. > > > The code does already exist in a Github repository [2]. > > > > > > Due to the frequent user requests and the fact that the code is already > > > present, I think would be a great feature for Flink to have. > > > Since this is a library on top of Flink's existing APIs this should not > > be > > > too hard to review. > > > > > > What do others think? > > > > > > Best, Fabian > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving > > > [2] https://github.com/FlinkML/flink-speculative-modelServer > > > > > > Am Mo., 5. Feb. 2018 um 13:11 Uhr schrieb Stavros Kontopoulos < > > > st.kontopou...@gmail.com>: > > > > > > > Thanx @Fabian. I will update the document accordingly wrt metrics. > > > > I agree there are pros and cons. > > > > > > > > Best, > > > > Stavros > > > > > > > > > > > > On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <fhue...@gmail.com> > > > wrote: > > > > > > > > > OK, I think there was plenty of time to comment on this FLIP. > > > > > I'll move it to the ACCEPTED status. > > > > > > > > > > @Stavros, please consider the feedback regarding the metrics. > > > > > I agree with Chesnay that metrics should be primarily exposed via > the > > > > > metrics system. > > > > > Storing them in state makes them fault-tolerant and queryable if > the > > > > state > > > > > is properly configured. > > > > > > > > > > Thanks, > > > > > Fabian > > > > > > > > > > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <ches...@apache.org>: > > > > > > > > > > > I'm currently looking over it, but one thing that stood out was > > that > > > > the > > > > > > FLIP proposes to use queryable state > > > > > > as a monitoring solution. Given that we have a metric system that > > > > > > integrates with plenty of commonly used > > > > > > metric backends this doesn't really make sense to me. > > > > > > > > > > > > Storing them in state still has value in terms of fault-tolerance > > > > though, > > > > > > since this is something that the metric > > > > > > system doesn't provide by itself. > > > > > > > > > > > > > > > > > > On 18.01.2018 13:57, Fabian Hueske wrote: > > > > > > > > > > > >> Are there any more comments on the FLIP? > > > > > >> > > > > > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs > [1] > > > and > > > > > >> continue with the implementation. > > > > > >> > > > > > >> Also, is there a committer who'd like to shepherd the FLIP and > > > review > > > > > the > > > > > >> corresponding PRs? > > > > > >> Of course, everybody is welcome to review the code but we need > at > > > > least > > > > > >> one > > > > > >> committer who will eventually merge the changes. > > > > > >> > > > > > >> Best, > > > > > >> Fabian > > > > > >> > > > > > >> [1] > > > > > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+ > > > > > >> Improvement+Proposals > > > > > >> > > > > > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > > > > > >> > > > > > >> Hi, > > > > > >>> > > > > > >>> Sorry for the late follow up. > > > > > >>> > > > > > >>> I think I understand the motivation for choosing ProtoBuf as > the > > > > > >>> representation and serialization format and this makes sense to > > me. > > > > > >>> > > > > > >>> However, it might be a good idea to provide tooling to convert > > > Flink > > > > > >>> types > > > > > >>> (described as TypeInformation) to ProtoBuf. > > > > > >>> Otherwise, users of the model serving library would need to > > > manually > > > > > >>> convert their data types (say Scala tuples, case classes, or > Avro > > > > > Pojos) > > > > > >>> to > > > > > >>> ProtoBuf messages. > > > > > >>> I don't think that this needs to be included in the first > version > > > but > > > > > it > > > > > >>> might be a good extension to make the library easier to use. > > > > > >>> > > > > > >>> Best, > > > > > >>> Fabian > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky < > > > > > >>> boris.lublin...@lightbend.com> > > > > > >>> : > > > > > >>> > > > > > >>> Thanks Fabian, > > > > > >>>> More below > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> Boris Lublinsky > > > > > >>>> FDP Architect > > > > > >>>> boris.lublin...@lightbend.com > > > > > >>>> https://www.lightbend.com/ > > > > > >>>> > > > > > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <fhue...@gmail.com > > > > > > wrote: > > > > > >>>> > > > > > >>>> Hi Boris and Stavros, > > > > > >>>> > > > > > >>>> Thanks for the responses. > > > > > >>>> > > > > > >>>> Ad 1) Thanks for the clarification. I think I misunderstood > this > > > > part > > > > > of > > > > > >>>> the proposal. > > > > > >>>> I interpreted the argument why to chose ProtoBuf for network > > > > encoding > > > > > >>>> ("ability > > > > > >>>> to represent different data types") such that different a > model > > > > > pipeline > > > > > >>>> should work on different data types. > > > > > >>>> I agree that it should be possible to give records of the same > > > type > > > > > (but > > > > > >>>> with different keys) to different models. The key-based join > > > > approach > > > > > >>>> looks > > > > > >>>> good to me. > > > > > >>>> > > > > > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize > > > > models > > > > > >>>> for > > > > > >>>> the given reasons. > > > > > >>>> However, the choice of ProtoBuf serialization for the records > > > might > > > > > make > > > > > >>>> the integration with existing libraries and also regular > > > DataStream > > > > > >>>> programs more difficult. > > > > > >>>> They all use Flink's TypeSerializer system to serialize and > > > > > deserialize > > > > > >>>> records by default. Hence, we would need to add a conversion > > step > > > > > before > > > > > >>>> records can be passed to a model serving operator. > > > > > >>>> Are you expecting some common format that all records follow > > (such > > > > as > > > > > a > > > > > >>>> Row or Vector type) or do you plan to support arbitrary > records > > > such > > > > > as > > > > > >>>> Pojos? > > > > > >>>> If you plan for a specific type, you could add a > TypeInformation > > > for > > > > > >>>> this > > > > > >>>> type with a TypeSerializer that is based on ProtoBuf. > > > > > >>>> > > > > > >>>> The way I look at it is slightly different. The common format > > for > > > > > >>>> records, supported by Flink, is Byte array with a little bit > of > > > > > header, > > > > > >>>> describing data type and is used for routing. The actual > > > > unmarshalling > > > > > >>>> is > > > > > >>>> done by the model implementation itself. This provides the > > maximum > > > > > >>>> flexibility and gives user the freedom to create his own types > > > > without > > > > > >>>> breaking underlying framework. > > > > > >>>> > > > > > >>>> Ad 4) @Boris: I made this point not about the serialization > > format > > > > but > > > > > >>>> how the library would integrate with Flink's DataStream API. > > > > > >>>> I thought I had seen a code snippet that showed a new method > on > > > the > > > > > >>>> DataStream object but cannot find this anymore. > > > > > >>>> So, I just wanted to make the point that we should not change > > the > > > > > >>>> DataStream API (unless it lacks support for some features) and > > > built > > > > > the > > > > > >>>> model serving library on top of it. > > > > > >>>> But I get from Stavros answer that this is your design anyway. > > > > > >>>> > > > > > >>>> Ad 5) The metrics system is the default way to expose system > and > > > job > > > > > >>>> metrics in Flink. Due to the pluggable reporter interface and > > > > various > > > > > >>>> reporters, they can be easily integrated in many production > > > > > >>>> environments. > > > > > >>>> A solution based on queryable state will always need custom > code > > > to > > > > > >>>> access the information. Of course this can be an optional > > feature. > > > > > >>>> > > > > > >>>> What do others think about this proposal? > > > > > >>>> > > > > > >>>> We had agreement among work group - Eron, Bas, Andrea, etc, > but > > > you > > > > > are > > > > > >>>> the first one outside of it. My book > https://www.lightbend.com > > > > > >>>> /blog/serving-machine-learning-models-free-oreilly- > > > > > ebook-from-lightbend > > > > > >>>> has > > > > > >>>> > > > > > >>>> a reasonably good reviews, so we are hoping this will work > > > > > >>>> > > > > > >>>> > > > > > >>>> Best, Fabian > > > > > >>>> > > > > > >>>> > > > > > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos < > > > > > >>>> st.kontopou...@gmail.com> > > > > > >>>> : > > > > > >>>> > > > > > >>>> Hi Fabian thanx! > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 1) Is it a strict requirement that a ML pipeline must be able > > to > > > > > handle > > > > > >>>>>> different input types? > > > > > >>>>>> I understand that it makes sense to have different models > for > > > > > >>>>>> different > > > > > >>>>>> instances of the same type, i.e., same data type but > different > > > > keys. > > > > > >>>>>> > > > > > >>>>> Hence, > > > > > >>>>> > > > > > >>>>>> the key-based joins make sense to me. However, couldn't > > > completely > > > > > >>>>>> different types be handled by different ML pipelines or > would > > > > there > > > > > be > > > > > >>>>>> major drawbacks? > > > > > >>>>>> > > > > > >>>>> > > > > > >>>>> Could you elaborate more on this? Right now we only use keys > > when > > > > we > > > > > do > > > > > >>>>> the > > > > > >>>>> join. A given pipeline can handle only a well defined type > (the > > > > type > > > > > >>>>> can > > > > > >>>>> be > > > > > >>>>> a simple string with a custom value, no need to be a > > > > > >>>>> class type) which serves as a key. > > > > > >>>>> > > > > > >>>>> 2) > > > > > >>>>> > > > > > >>>>> I think from an API point of view it would be better to not > > > require > > > > > >>>>> > > > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, > the > > > > model > > > > > >>>>>> > > > > > >>>>> server > > > > > >>>>> > > > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > > > necessary) > > > > > >>>>>> > > > > > >>>>> convert > > > > > >>>>> > > > > > >>>>>> them to ProtoBuf messages internally. In case we need to > > support > > > > > >>>>>> > > > > > >>>>> different > > > > > >>>>> > > > > > >>>>>> types of records (see my first point), we can introduce a > > Union > > > > type > > > > > >>>>>> > > > > > >>>>> (i.e., > > > > > >>>>> > > > > > >>>>>> an n-ary Either type). I see that we need some kind of > binary > > > > > encoding > > > > > >>>>>> format for the models but maybe also this can be designed to > > be > > > > > >>>>>> > > > > > >>>>> pluggable > > > > > >>>>> > > > > > >>>>>> such that later other encodings can be added. > > > > > >>>>>> > > > > > >>>>>> We do uses scala classes (strongly typed classes), > protobuf > > is > > > > > only > > > > > >>>>> used > > > > > >>>>> on the wire. For on the wire encoding we prefer protobufs for > > > size, > > > > > >>>>> expressiveness and ability to represent different data types. > > > > > >>>>> > > > > > >>>>> 3) > > > > > >>>>> > > > > > >>>>> I think the DataStream Java API should be supported as a > first > > > > class > > > > > >>>>> > > > > > >>>>>> citizens for this library. > > > > > >>>>>> > > > > > >>>>> > > > > > >>>>> I agree. It should be either first priority or a next thing > to > > > do. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 4) > > > > > >>>>> > > > > > >>>>> For the integration with the DataStream API, we could provide > > an > > > > API > > > > > >>>>> that > > > > > >>>>> > > > > > >>>>>> receives (typed) DataStream objects, internally constructs > the > > > > > >>>>>> > > > > > >>>>> DataStream > > > > > >>>>> > > > > > >>>>>> operators, and returns one (or more) result DataStreams. The > > > > benefit > > > > > >>>>>> is > > > > > >>>>>> that we don't need to change the DataStream API directly, > but > > > put > > > > a > > > > > >>>>>> > > > > > >>>>> library > > > > > >>>>> > > > > > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this > > > > > approach. > > > > > >>>>>> > > > > > >>>>> > > > > > >>>>> We will provide a DSL which will do jsut this. But even > > without > > > > the > > > > > >>>>> DSL > > > > > >>>>> this is what we do with low level joins. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 5) > > > > > >>>>> > > > > > >>>>> I'm skeptical about using queryable state to expose metrics. > > Did > > > > you > > > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > > > configurable > > > > > >>>>>> > > > > > >>>>> and we > > > > > >>>>> > > > > > >>>>>> provided several reporters that export the metrics. > > > > > >>>>>> > > > > > >>>>>> This of course is an option. The choice of queryable state > was > > > > > mostly > > > > > >>>>> driven by a simplicity of real time integration. Any reason > > why > > > > > >>>>> metrics > > > > > >>>>> system is netter? > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> Best, > > > > > >>>>> Stavros > > > > > >>>>> > > > > > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske < > > > fhue...@gmail.com> > > > > > >>>>> wrote: > > > > > >>>>> > > > > > >>>>> Hi Stavros, > > > > > >>>>>> > > > > > >>>>>> thanks for the detailed FLIP! > > > > > >>>>>> Model serving is an important use case and it's great to see > > > > efforts > > > > > >>>>>> > > > > > >>>>> to add > > > > > >>>>> > > > > > >>>>>> a library for this to Flink! > > > > > >>>>>> > > > > > >>>>>> I've read the FLIP and would like to ask a few questions and > > > make > > > > > some > > > > > >>>>>> suggestions. > > > > > >>>>>> > > > > > >>>>>> 1) Is it a strict requirement that a ML pipeline must be > able > > to > > > > > >>>>>> handle > > > > > >>>>>> different input types? > > > > > >>>>>> I understand that it makes sense to have different models > for > > > > > >>>>>> different > > > > > >>>>>> instances of the same type, i.e., same data type but > different > > > > keys. > > > > > >>>>>> > > > > > >>>>> Hence, > > > > > >>>>> > > > > > >>>>>> the key-based joins make sense to me. However, couldn't > > > completely > > > > > >>>>>> different types be handled by different ML pipelines or > would > > > > there > > > > > be > > > > > >>>>>> major drawbacks? > > > > > >>>>>> > > > > > >>>>>> 2) I think from an API point of view it would be better to > not > > > > > require > > > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, > the > > > > model > > > > > >>>>>> > > > > > >>>>> server > > > > > >>>>> > > > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > > > necessary) > > > > > >>>>>> > > > > > >>>>> convert > > > > > >>>>> > > > > > >>>>>> them to ProtoBuf messages internally. In case we need to > > support > > > > > >>>>>> > > > > > >>>>> different > > > > > >>>>> > > > > > >>>>>> types of records (see my first point), we can introduce a > > Union > > > > type > > > > > >>>>>> > > > > > >>>>> (i.e., > > > > > >>>>> > > > > > >>>>>> an n-ary Either type). I see that we need some kind of > binary > > > > > encoding > > > > > >>>>>> format for the models but maybe also this can be designed to > > be > > > > > >>>>>> > > > > > >>>>> pluggable > > > > > >>>>> > > > > > >>>>>> such that later other encodings can be added. > > > > > >>>>>> > > > > > >>>>>> 3) I think the DataStream Java API should be supported as a > > > first > > > > > >>>>>> class > > > > > >>>>>> citizens for this library. > > > > > >>>>>> > > > > > >>>>>> 4) For the integration with the DataStream API, we could > > provide > > > > an > > > > > >>>>>> API > > > > > >>>>>> that receives (typed) DataStream objects, internally > > constructs > > > > the > > > > > >>>>>> DataStream operators, and returns one (or more) result > > > > DataStreams. > > > > > >>>>>> The > > > > > >>>>>> benefit is that we don't need to change the DataStream API > > > > directly, > > > > > >>>>>> > > > > > >>>>> but > > > > > >>>>> > > > > > >>>>>> put a library on top. The other libraries (CEP, Table, > Gelly) > > > > follow > > > > > >>>>>> > > > > > >>>>> this > > > > > >>>>> > > > > > >>>>>> approach. > > > > > >>>>>> > > > > > >>>>>> 5) I'm skeptical about using queryable state to expose > > metrics. > > > > Did > > > > > >>>>>> you > > > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > > > configurable > > > > > >>>>>> > > > > > >>>>> and we > > > > > >>>>> > > > > > >>>>>> provided several reporters that export the metrics. > > > > > >>>>>> > > > > > >>>>>> What do you think? > > > > > >>>>>> Best, Fabian > > > > > >>>>>> > > > > > >>>>>> [1] > > > > > >>>>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > > > > > >>>>>> > > > > > >>>>> monitoring/ > > > > > >>>>> > > > > > >>>>>> metrics.html > > > > > >>>>>> > > > > > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < > > > > > >>>>>> > > > > > >>>>> st.kontopou...@gmail.com>: > > > > > >>>>> > > > > > >>>>>> Hi guys, > > > > > >>>>>>> > > > > > >>>>>>> Let's discuss the new FLIP proposal for model serving over > > > Flink. > > > > > The > > > > > >>>>>>> > > > > > >>>>>> idea > > > > > >>>>>> > > > > > >>>>>>> is to combine previous efforts there and provide a library > on > > > top > > > > > of > > > > > >>>>>>> > > > > > >>>>>> Flink > > > > > >>>>>> > > > > > >>>>>>> for serving models. > > > > > >>>>>>> > > > > > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > > > >>>>>>> > > > > > >>>>>> 23+-+Model+Serving > > > > > >>>>>> > > > > > >>>>>>> Code from previous efforts can be found here: > > > > > >>>>>>> > > > > > >>>>>> https://github.com/FlinkML > > > > > >>>>> > > > > > >>>>>> Best, > > > > > >>>>>>> Stavros > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>> > > > > > >>>> > > > > > > > > > > > > > > > > > > > > >