Hi everybody,

The question of how to serve ML models in Flink applications came up in
several conversations I had with Flink users in the last months.
Recently, Boris approached me and he told me that he'd like to revive the
efforts around FLIP-23 [1].

In the last days, Boris extended the proposal by a speculative model
evaluation which allows for evaluating multiple modes of varying complexity
to ensure certain SLAs.
The code does already exist in a Github repository [2].

Due to the frequent user requests and the fact that the code is already
present, I think would be a great feature for Flink to have.
Since this is a library on top of Flink's existing APIs this should not be
too hard to review.

What do others think?

Best, Fabian

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving
[2] https://github.com/FlinkML/flink-speculative-modelServer

Am Mo., 5. Feb. 2018 um 13:11 Uhr schrieb Stavros Kontopoulos <
st.kontopou...@gmail.com>:

> Thanx @Fabian. I will update the document accordingly wrt metrics.
> I agree there are pros and cons.
>
> Best,
> Stavros
>
>
> On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > OK, I think there was plenty of time to comment on this FLIP.
> > I'll move it to the ACCEPTED status.
> >
> > @Stavros, please consider the feedback regarding the metrics.
> > I agree with Chesnay that metrics should be primarily exposed via the
> > metrics system.
> > Storing them in state makes them fault-tolerant and queryable if the
> state
> > is properly configured.
> >
> > Thanks,
> > Fabian
> >
> > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <ches...@apache.org>:
> >
> > > I'm currently looking over it, but one thing that stood out was that
> the
> > > FLIP proposes to use queryable state
> > > as a monitoring solution. Given that we have a metric system that
> > > integrates with plenty of commonly used
> > > metric backends this doesn't really make sense to me.
> > >
> > > Storing them in state still has value in terms of fault-tolerance
> though,
> > > since this is something that the metric
> > > system doesn't provide by itself.
> > >
> > >
> > > On 18.01.2018 13:57, Fabian Hueske wrote:
> > >
> > >> Are there any more comments on the FLIP?
> > >>
> > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1] and
> > >> continue with the implementation.
> > >>
> > >> Also, is there a committer who'd like to shepherd the FLIP and review
> > the
> > >> corresponding PRs?
> > >> Of course, everybody is welcome to review the code but we need at
> least
> > >> one
> > >> committer who will eventually merge the changes.
> > >>
> > >> Best,
> > >> Fabian
> > >>
> > >> [1]
> > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+
> > >> Improvement+Proposals
> > >>
> > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
> > >>
> > >> Hi,
> > >>>
> > >>> Sorry for the late follow up.
> > >>>
> > >>> I think I understand the motivation for choosing ProtoBuf as the
> > >>> representation and serialization format and this makes sense to me.
> > >>>
> > >>> However, it might be a good idea to provide tooling to convert Flink
> > >>> types
> > >>> (described as TypeInformation) to ProtoBuf.
> > >>> Otherwise, users of the model serving library would need to manually
> > >>> convert their data types (say Scala tuples, case classes, or Avro
> > Pojos)
> > >>> to
> > >>> ProtoBuf messages.
> > >>> I don't think that this needs to be included in the first version but
> > it
> > >>> might be a good extension to make the library easier to use.
> > >>>
> > >>> Best,
> > >>> Fabian
> > >>>
> > >>>
> > >>>
> > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky <
> > >>> boris.lublin...@lightbend.com>
> > >>> :
> > >>>
> > >>> Thanks Fabian,
> > >>>> More below
> > >>>>
> > >>>>
> > >>>>
> > >>>> Boris Lublinsky
> > >>>> FDP Architect
> > >>>> boris.lublin...@lightbend.com
> > >>>> https://www.lightbend.com/
> > >>>>
> > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> > >>>>
> > >>>> Hi Boris and Stavros,
> > >>>>
> > >>>> Thanks for the responses.
> > >>>>
> > >>>> Ad 1) Thanks for the clarification. I think I misunderstood this
> part
> > of
> > >>>> the proposal.
> > >>>> I interpreted the argument why to chose ProtoBuf for network
> encoding
> > >>>> ("ability
> > >>>> to represent different data types") such that different a model
> > pipeline
> > >>>> should work on different data types.
> > >>>> I agree that it should be possible to give records of the same type
> > (but
> > >>>> with different keys) to different models. The key-based join
> approach
> > >>>> looks
> > >>>> good to me.
> > >>>>
> > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize
> models
> > >>>> for
> > >>>> the given reasons.
> > >>>> However, the choice of ProtoBuf serialization for the records might
> > make
> > >>>> the integration with existing libraries and also regular DataStream
> > >>>> programs more difficult.
> > >>>> They all use Flink's TypeSerializer system to serialize and
> > deserialize
> > >>>> records by default. Hence, we would need to add a conversion step
> > before
> > >>>> records can be passed to a model serving operator.
> > >>>> Are you expecting some common format that all records follow (such
> as
> > a
> > >>>> Row or Vector type) or do you plan to support arbitrary records such
> > as
> > >>>> Pojos?
> > >>>> If you plan for a specific type, you could add a TypeInformation for
> > >>>> this
> > >>>> type with a TypeSerializer that is based on ProtoBuf.
> > >>>>
> > >>>> The way I look at it is slightly different. The common format for
> > >>>> records, supported by Flink, is Byte array with a little bit of
> > header,
> > >>>> describing data type and is used for routing. The actual
> unmarshalling
> > >>>> is
> > >>>> done by the model implementation itself. This provides the maximum
> > >>>> flexibility and gives user the freedom to create his own types
> without
> > >>>> breaking underlying framework.
> > >>>>
> > >>>> Ad 4) @Boris: I made this point not about the serialization format
> but
> > >>>> how the library would integrate with Flink's DataStream API.
> > >>>> I thought I had seen a code snippet that showed a new method on the
> > >>>> DataStream object but cannot find this anymore.
> > >>>> So, I just wanted to make the point that we should not change the
> > >>>> DataStream API (unless it lacks support for some features) and built
> > the
> > >>>> model serving library on top of it.
> > >>>> But I get from Stavros answer that this is your design anyway.
> > >>>>
> > >>>> Ad 5) The metrics system is the default way to expose system and job
> > >>>> metrics in Flink. Due to the pluggable reporter interface and
> various
> > >>>> reporters, they can be easily integrated in many production
> > >>>> environments.
> > >>>> A solution based on queryable state will always need custom code to
> > >>>> access the information. Of course this can be an optional feature.
> > >>>>
> > >>>> What do others think about this proposal?
> > >>>>
> > >>>> We had agreement among work group - Eron, Bas, Andrea, etc, but you
> > are
> > >>>> the first one outside of it. My book https://www.lightbend.com
> > >>>> /blog/serving-machine-learning-models-free-oreilly-
> > ebook-from-lightbend
> > >>>> has
> > >>>>
> > >>>> a reasonably good reviews, so we are hoping this will work
> > >>>>
> > >>>>
> > >>>> Best, Fabian
> > >>>>
> > >>>>
> > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos <
> > >>>> st.kontopou...@gmail.com>
> > >>>> :
> > >>>>
> > >>>> Hi Fabian thanx!
> > >>>>>
> > >>>>>
> > >>>>> 1) Is it a strict requirement that a ML pipeline must be able to
> > handle
> > >>>>>> different input types?
> > >>>>>> I understand that it makes sense to have different models for
> > >>>>>> different
> > >>>>>> instances of the same type, i.e., same data type but different
> keys.
> > >>>>>>
> > >>>>> Hence,
> > >>>>>
> > >>>>>> the key-based joins make sense to me. However, couldn't completely
> > >>>>>> different types be handled by different ML pipelines or would
> there
> > be
> > >>>>>> major drawbacks?
> > >>>>>>
> > >>>>>
> > >>>>> Could you elaborate more on this? Right now we only use keys when
> we
> > do
> > >>>>> the
> > >>>>> join. A given pipeline can handle only a well defined type (the
> type
> > >>>>> can
> > >>>>> be
> > >>>>> a simple string with a custom value, no need to be a
> > >>>>> class type) which serves as a key.
> > >>>>>
> > >>>>> 2)
> > >>>>>
> > >>>>> I think from an API point of view it would be better to not require
> > >>>>>
> > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the
> model
> > >>>>>>
> > >>>>> server
> > >>>>>
> > >>>>>> could accept strongly-typed objects (Java/Scala) and (if
> necessary)
> > >>>>>>
> > >>>>> convert
> > >>>>>
> > >>>>>> them to ProtoBuf messages internally. In case we need to support
> > >>>>>>
> > >>>>> different
> > >>>>>
> > >>>>>> types of records (see my first point), we can introduce a Union
> type
> > >>>>>>
> > >>>>> (i.e.,
> > >>>>>
> > >>>>>> an n-ary Either type). I see that we need some kind of binary
> > encoding
> > >>>>>> format for the models but maybe also this can be designed to be
> > >>>>>>
> > >>>>> pluggable
> > >>>>>
> > >>>>>> such that later other encodings can be added.
> > >>>>>>
> > >>>>>>   We do uses scala classes (strongly typed classes), protobuf is
> > only
> > >>>>> used
> > >>>>> on the wire. For on the wire encoding we prefer protobufs for size,
> > >>>>> expressiveness and ability to represent different data types.
> > >>>>>
> > >>>>> 3)
> > >>>>>
> > >>>>> I think the DataStream Java API should be supported as a first
> class
> > >>>>>
> > >>>>>> citizens for this library.
> > >>>>>>
> > >>>>>
> > >>>>> I agree. It should be either first priority or a next thing to do.
> > >>>>>
> > >>>>>
> > >>>>> 4)
> > >>>>>
> > >>>>> For the integration with the DataStream API, we could provide an
> API
> > >>>>> that
> > >>>>>
> > >>>>>> receives (typed) DataStream objects, internally constructs the
> > >>>>>>
> > >>>>> DataStream
> > >>>>>
> > >>>>>> operators, and returns one (or more) result DataStreams. The
> benefit
> > >>>>>> is
> > >>>>>> that we don't need to change the DataStream API directly, but put
> a
> > >>>>>>
> > >>>>> library
> > >>>>>
> > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this
> > approach.
> > >>>>>>
> > >>>>>
> > >>>>>   We will provide a DSL which will do jsut this. But even without
> the
> > >>>>> DSL
> > >>>>> this is what we do with low level joins.
> > >>>>>
> > >>>>>
> > >>>>> 5)
> > >>>>>
> > >>>>> I'm skeptical about using queryable state to expose metrics. Did
> you
> > >>>>>> consider using Flink's metrics system [1]? It is easily
> configurable
> > >>>>>>
> > >>>>> and we
> > >>>>>
> > >>>>>> provided several reporters that export the metrics.
> > >>>>>>
> > >>>>>> This of course is an option. The choice of queryable state was
> > mostly
> > >>>>> driven by a simplicity of real time integration.  Any reason why
> > >>>>> metrics
> > >>>>> system is netter?
> > >>>>>
> > >>>>>
> > >>>>> Best,
> > >>>>> Stavros
> > >>>>>
> > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <fhue...@gmail.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>> Hi Stavros,
> > >>>>>>
> > >>>>>> thanks for the detailed FLIP!
> > >>>>>> Model serving is an important use case and it's great to see
> efforts
> > >>>>>>
> > >>>>> to add
> > >>>>>
> > >>>>>> a library for this to Flink!
> > >>>>>>
> > >>>>>> I've read the FLIP and would like to ask a few questions and make
> > some
> > >>>>>> suggestions.
> > >>>>>>
> > >>>>>> 1) Is it a strict requirement that a ML pipeline must be able to
> > >>>>>> handle
> > >>>>>> different input types?
> > >>>>>> I understand that it makes sense to have different models for
> > >>>>>> different
> > >>>>>> instances of the same type, i.e., same data type but different
> keys.
> > >>>>>>
> > >>>>> Hence,
> > >>>>>
> > >>>>>> the key-based joins make sense to me. However, couldn't completely
> > >>>>>> different types be handled by different ML pipelines or would
> there
> > be
> > >>>>>> major drawbacks?
> > >>>>>>
> > >>>>>> 2) I think from an API point of view it would be better to not
> > require
> > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the
> model
> > >>>>>>
> > >>>>> server
> > >>>>>
> > >>>>>> could accept strongly-typed objects (Java/Scala) and (if
> necessary)
> > >>>>>>
> > >>>>> convert
> > >>>>>
> > >>>>>> them to ProtoBuf messages internally. In case we need to support
> > >>>>>>
> > >>>>> different
> > >>>>>
> > >>>>>> types of records (see my first point), we can introduce a Union
> type
> > >>>>>>
> > >>>>> (i.e.,
> > >>>>>
> > >>>>>> an n-ary Either type). I see that we need some kind of binary
> > encoding
> > >>>>>> format for the models but maybe also this can be designed to be
> > >>>>>>
> > >>>>> pluggable
> > >>>>>
> > >>>>>> such that later other encodings can be added.
> > >>>>>>
> > >>>>>> 3) I think the DataStream Java API should be supported as a first
> > >>>>>> class
> > >>>>>> citizens for this library.
> > >>>>>>
> > >>>>>> 4) For the integration with the DataStream API, we could provide
> an
> > >>>>>> API
> > >>>>>> that receives (typed) DataStream objects, internally constructs
> the
> > >>>>>> DataStream operators, and returns one (or more) result
> DataStreams.
> > >>>>>> The
> > >>>>>> benefit is that we don't need to change the DataStream API
> directly,
> > >>>>>>
> > >>>>> but
> > >>>>>
> > >>>>>> put a library on top. The other libraries (CEP, Table, Gelly)
> follow
> > >>>>>>
> > >>>>> this
> > >>>>>
> > >>>>>> approach.
> > >>>>>>
> > >>>>>> 5) I'm skeptical about using queryable state to expose metrics.
> Did
> > >>>>>> you
> > >>>>>> consider using Flink's metrics system [1]? It is easily
> configurable
> > >>>>>>
> > >>>>> and we
> > >>>>>
> > >>>>>> provided several reporters that export the metrics.
> > >>>>>>
> > >>>>>> What do you think?
> > >>>>>> Best, Fabian
> > >>>>>>
> > >>>>>> [1]
> > >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> > >>>>>>
> > >>>>> monitoring/
> > >>>>>
> > >>>>>> metrics.html
> > >>>>>>
> > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos <
> > >>>>>>
> > >>>>> st.kontopou...@gmail.com>:
> > >>>>>
> > >>>>>> Hi guys,
> > >>>>>>>
> > >>>>>>> Let's discuss the new FLIP proposal for model serving over Flink.
> > The
> > >>>>>>>
> > >>>>>> idea
> > >>>>>>
> > >>>>>>> is to combine previous efforts there and provide a library on top
> > of
> > >>>>>>>
> > >>>>>> Flink
> > >>>>>>
> > >>>>>>> for serving models.
> > >>>>>>>
> > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > >>>>>>>
> > >>>>>> 23+-+Model+Serving
> > >>>>>>
> > >>>>>>> Code from previous efforts can be found here:
> > >>>>>>>
> > >>>>>> https://github.com/FlinkML
> > >>>>>
> > >>>>>> Best,
> > >>>>>>> Stavros
> > >>>>>>>
> > >>>>>>>
> > >>>>
> > >>>>
> > >
> >
>

Reply via email to