Hi everybody, The question of how to serve ML models in Flink applications came up in several conversations I had with Flink users in the last months. Recently, Boris approached me and he told me that he'd like to revive the efforts around FLIP-23 [1].
In the last days, Boris extended the proposal by a speculative model evaluation which allows for evaluating multiple modes of varying complexity to ensure certain SLAs. The code does already exist in a Github repository [2]. Due to the frequent user requests and the fact that the code is already present, I think would be a great feature for Flink to have. Since this is a library on top of Flink's existing APIs this should not be too hard to review. What do others think? Best, Fabian [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving [2] https://github.com/FlinkML/flink-speculative-modelServer Am Mo., 5. Feb. 2018 um 13:11 Uhr schrieb Stavros Kontopoulos < st.kontopou...@gmail.com>: > Thanx @Fabian. I will update the document accordingly wrt metrics. > I agree there are pros and cons. > > Best, > Stavros > > > On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > > OK, I think there was plenty of time to comment on this FLIP. > > I'll move it to the ACCEPTED status. > > > > @Stavros, please consider the feedback regarding the metrics. > > I agree with Chesnay that metrics should be primarily exposed via the > > metrics system. > > Storing them in state makes them fault-tolerant and queryable if the > state > > is properly configured. > > > > Thanks, > > Fabian > > > > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <ches...@apache.org>: > > > > > I'm currently looking over it, but one thing that stood out was that > the > > > FLIP proposes to use queryable state > > > as a monitoring solution. Given that we have a metric system that > > > integrates with plenty of commonly used > > > metric backends this doesn't really make sense to me. > > > > > > Storing them in state still has value in terms of fault-tolerance > though, > > > since this is something that the metric > > > system doesn't provide by itself. > > > > > > > > > On 18.01.2018 13:57, Fabian Hueske wrote: > > > > > >> Are there any more comments on the FLIP? > > >> > > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1] and > > >> continue with the implementation. > > >> > > >> Also, is there a committer who'd like to shepherd the FLIP and review > > the > > >> corresponding PRs? > > >> Of course, everybody is welcome to review the code but we need at > least > > >> one > > >> committer who will eventually merge the changes. > > >> > > >> Best, > > >> Fabian > > >> > > >> [1] > > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+ > > >> Improvement+Proposals > > >> > > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > > >> > > >> Hi, > > >>> > > >>> Sorry for the late follow up. > > >>> > > >>> I think I understand the motivation for choosing ProtoBuf as the > > >>> representation and serialization format and this makes sense to me. > > >>> > > >>> However, it might be a good idea to provide tooling to convert Flink > > >>> types > > >>> (described as TypeInformation) to ProtoBuf. > > >>> Otherwise, users of the model serving library would need to manually > > >>> convert their data types (say Scala tuples, case classes, or Avro > > Pojos) > > >>> to > > >>> ProtoBuf messages. > > >>> I don't think that this needs to be included in the first version but > > it > > >>> might be a good extension to make the library easier to use. > > >>> > > >>> Best, > > >>> Fabian > > >>> > > >>> > > >>> > > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky < > > >>> boris.lublin...@lightbend.com> > > >>> : > > >>> > > >>> Thanks Fabian, > > >>>> More below > > >>>> > > >>>> > > >>>> > > >>>> Boris Lublinsky > > >>>> FDP Architect > > >>>> boris.lublin...@lightbend.com > > >>>> https://www.lightbend.com/ > > >>>> > > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <fhue...@gmail.com> > wrote: > > >>>> > > >>>> Hi Boris and Stavros, > > >>>> > > >>>> Thanks for the responses. > > >>>> > > >>>> Ad 1) Thanks for the clarification. I think I misunderstood this > part > > of > > >>>> the proposal. > > >>>> I interpreted the argument why to chose ProtoBuf for network > encoding > > >>>> ("ability > > >>>> to represent different data types") such that different a model > > pipeline > > >>>> should work on different data types. > > >>>> I agree that it should be possible to give records of the same type > > (but > > >>>> with different keys) to different models. The key-based join > approach > > >>>> looks > > >>>> good to me. > > >>>> > > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize > models > > >>>> for > > >>>> the given reasons. > > >>>> However, the choice of ProtoBuf serialization for the records might > > make > > >>>> the integration with existing libraries and also regular DataStream > > >>>> programs more difficult. > > >>>> They all use Flink's TypeSerializer system to serialize and > > deserialize > > >>>> records by default. Hence, we would need to add a conversion step > > before > > >>>> records can be passed to a model serving operator. > > >>>> Are you expecting some common format that all records follow (such > as > > a > > >>>> Row or Vector type) or do you plan to support arbitrary records such > > as > > >>>> Pojos? > > >>>> If you plan for a specific type, you could add a TypeInformation for > > >>>> this > > >>>> type with a TypeSerializer that is based on ProtoBuf. > > >>>> > > >>>> The way I look at it is slightly different. The common format for > > >>>> records, supported by Flink, is Byte array with a little bit of > > header, > > >>>> describing data type and is used for routing. The actual > unmarshalling > > >>>> is > > >>>> done by the model implementation itself. This provides the maximum > > >>>> flexibility and gives user the freedom to create his own types > without > > >>>> breaking underlying framework. > > >>>> > > >>>> Ad 4) @Boris: I made this point not about the serialization format > but > > >>>> how the library would integrate with Flink's DataStream API. > > >>>> I thought I had seen a code snippet that showed a new method on the > > >>>> DataStream object but cannot find this anymore. > > >>>> So, I just wanted to make the point that we should not change the > > >>>> DataStream API (unless it lacks support for some features) and built > > the > > >>>> model serving library on top of it. > > >>>> But I get from Stavros answer that this is your design anyway. > > >>>> > > >>>> Ad 5) The metrics system is the default way to expose system and job > > >>>> metrics in Flink. Due to the pluggable reporter interface and > various > > >>>> reporters, they can be easily integrated in many production > > >>>> environments. > > >>>> A solution based on queryable state will always need custom code to > > >>>> access the information. Of course this can be an optional feature. > > >>>> > > >>>> What do others think about this proposal? > > >>>> > > >>>> We had agreement among work group - Eron, Bas, Andrea, etc, but you > > are > > >>>> the first one outside of it. My book https://www.lightbend.com > > >>>> /blog/serving-machine-learning-models-free-oreilly- > > ebook-from-lightbend > > >>>> has > > >>>> > > >>>> a reasonably good reviews, so we are hoping this will work > > >>>> > > >>>> > > >>>> Best, Fabian > > >>>> > > >>>> > > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos < > > >>>> st.kontopou...@gmail.com> > > >>>> : > > >>>> > > >>>> Hi Fabian thanx! > > >>>>> > > >>>>> > > >>>>> 1) Is it a strict requirement that a ML pipeline must be able to > > handle > > >>>>>> different input types? > > >>>>>> I understand that it makes sense to have different models for > > >>>>>> different > > >>>>>> instances of the same type, i.e., same data type but different > keys. > > >>>>>> > > >>>>> Hence, > > >>>>> > > >>>>>> the key-based joins make sense to me. However, couldn't completely > > >>>>>> different types be handled by different ML pipelines or would > there > > be > > >>>>>> major drawbacks? > > >>>>>> > > >>>>> > > >>>>> Could you elaborate more on this? Right now we only use keys when > we > > do > > >>>>> the > > >>>>> join. A given pipeline can handle only a well defined type (the > type > > >>>>> can > > >>>>> be > > >>>>> a simple string with a custom value, no need to be a > > >>>>> class type) which serves as a key. > > >>>>> > > >>>>> 2) > > >>>>> > > >>>>> I think from an API point of view it would be better to not require > > >>>>> > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > model > > >>>>>> > > >>>>> server > > >>>>> > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > necessary) > > >>>>>> > > >>>>> convert > > >>>>> > > >>>>>> them to ProtoBuf messages internally. In case we need to support > > >>>>>> > > >>>>> different > > >>>>> > > >>>>>> types of records (see my first point), we can introduce a Union > type > > >>>>>> > > >>>>> (i.e., > > >>>>> > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > encoding > > >>>>>> format for the models but maybe also this can be designed to be > > >>>>>> > > >>>>> pluggable > > >>>>> > > >>>>>> such that later other encodings can be added. > > >>>>>> > > >>>>>> We do uses scala classes (strongly typed classes), protobuf is > > only > > >>>>> used > > >>>>> on the wire. For on the wire encoding we prefer protobufs for size, > > >>>>> expressiveness and ability to represent different data types. > > >>>>> > > >>>>> 3) > > >>>>> > > >>>>> I think the DataStream Java API should be supported as a first > class > > >>>>> > > >>>>>> citizens for this library. > > >>>>>> > > >>>>> > > >>>>> I agree. It should be either first priority or a next thing to do. > > >>>>> > > >>>>> > > >>>>> 4) > > >>>>> > > >>>>> For the integration with the DataStream API, we could provide an > API > > >>>>> that > > >>>>> > > >>>>>> receives (typed) DataStream objects, internally constructs the > > >>>>>> > > >>>>> DataStream > > >>>>> > > >>>>>> operators, and returns one (or more) result DataStreams. The > benefit > > >>>>>> is > > >>>>>> that we don't need to change the DataStream API directly, but put > a > > >>>>>> > > >>>>> library > > >>>>> > > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this > > approach. > > >>>>>> > > >>>>> > > >>>>> We will provide a DSL which will do jsut this. But even without > the > > >>>>> DSL > > >>>>> this is what we do with low level joins. > > >>>>> > > >>>>> > > >>>>> 5) > > >>>>> > > >>>>> I'm skeptical about using queryable state to expose metrics. Did > you > > >>>>>> consider using Flink's metrics system [1]? It is easily > configurable > > >>>>>> > > >>>>> and we > > >>>>> > > >>>>>> provided several reporters that export the metrics. > > >>>>>> > > >>>>>> This of course is an option. The choice of queryable state was > > mostly > > >>>>> driven by a simplicity of real time integration. Any reason why > > >>>>> metrics > > >>>>> system is netter? > > >>>>> > > >>>>> > > >>>>> Best, > > >>>>> Stavros > > >>>>> > > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <fhue...@gmail.com> > > >>>>> wrote: > > >>>>> > > >>>>> Hi Stavros, > > >>>>>> > > >>>>>> thanks for the detailed FLIP! > > >>>>>> Model serving is an important use case and it's great to see > efforts > > >>>>>> > > >>>>> to add > > >>>>> > > >>>>>> a library for this to Flink! > > >>>>>> > > >>>>>> I've read the FLIP and would like to ask a few questions and make > > some > > >>>>>> suggestions. > > >>>>>> > > >>>>>> 1) Is it a strict requirement that a ML pipeline must be able to > > >>>>>> handle > > >>>>>> different input types? > > >>>>>> I understand that it makes sense to have different models for > > >>>>>> different > > >>>>>> instances of the same type, i.e., same data type but different > keys. > > >>>>>> > > >>>>> Hence, > > >>>>> > > >>>>>> the key-based joins make sense to me. However, couldn't completely > > >>>>>> different types be handled by different ML pipelines or would > there > > be > > >>>>>> major drawbacks? > > >>>>>> > > >>>>>> 2) I think from an API point of view it would be better to not > > require > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > model > > >>>>>> > > >>>>> server > > >>>>> > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > necessary) > > >>>>>> > > >>>>> convert > > >>>>> > > >>>>>> them to ProtoBuf messages internally. In case we need to support > > >>>>>> > > >>>>> different > > >>>>> > > >>>>>> types of records (see my first point), we can introduce a Union > type > > >>>>>> > > >>>>> (i.e., > > >>>>> > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > encoding > > >>>>>> format for the models but maybe also this can be designed to be > > >>>>>> > > >>>>> pluggable > > >>>>> > > >>>>>> such that later other encodings can be added. > > >>>>>> > > >>>>>> 3) I think the DataStream Java API should be supported as a first > > >>>>>> class > > >>>>>> citizens for this library. > > >>>>>> > > >>>>>> 4) For the integration with the DataStream API, we could provide > an > > >>>>>> API > > >>>>>> that receives (typed) DataStream objects, internally constructs > the > > >>>>>> DataStream operators, and returns one (or more) result > DataStreams. > > >>>>>> The > > >>>>>> benefit is that we don't need to change the DataStream API > directly, > > >>>>>> > > >>>>> but > > >>>>> > > >>>>>> put a library on top. The other libraries (CEP, Table, Gelly) > follow > > >>>>>> > > >>>>> this > > >>>>> > > >>>>>> approach. > > >>>>>> > > >>>>>> 5) I'm skeptical about using queryable state to expose metrics. > Did > > >>>>>> you > > >>>>>> consider using Flink's metrics system [1]? It is easily > configurable > > >>>>>> > > >>>>> and we > > >>>>> > > >>>>>> provided several reporters that export the metrics. > > >>>>>> > > >>>>>> What do you think? > > >>>>>> Best, Fabian > > >>>>>> > > >>>>>> [1] > > >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > > >>>>>> > > >>>>> monitoring/ > > >>>>> > > >>>>>> metrics.html > > >>>>>> > > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < > > >>>>>> > > >>>>> st.kontopou...@gmail.com>: > > >>>>> > > >>>>>> Hi guys, > > >>>>>>> > > >>>>>>> Let's discuss the new FLIP proposal for model serving over Flink. > > The > > >>>>>>> > > >>>>>> idea > > >>>>>> > > >>>>>>> is to combine previous efforts there and provide a library on top > > of > > >>>>>>> > > >>>>>> Flink > > >>>>>> > > >>>>>>> for serving models. > > >>>>>>> > > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > >>>>>>> > > >>>>>> 23+-+Model+Serving > > >>>>>> > > >>>>>>> Code from previous efforts can be found here: > > >>>>>>> > > >>>>>> https://github.com/FlinkML > > >>>>> > > >>>>>> Best, > > >>>>>>> Stavros > > >>>>>>> > > >>>>>>> > > >>>> > > >>>> > > > > > >