Thanks for the contribution Boris!! I've been playing around with the basic
model for a while back and loved it.
+1 and really looking forward to having the feature merging back to Flink
ML.

--
Rong

On Mon, Oct 1, 2018 at 7:55 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi everybody,
>
> The question of how to serve ML models in Flink applications came up in
> several conversations I had with Flink users in the last months.
> Recently, Boris approached me and he told me that he'd like to revive the
> efforts around FLIP-23 [1].
>
> In the last days, Boris extended the proposal by a speculative model
> evaluation which allows for evaluating multiple modes of varying complexity
> to ensure certain SLAs.
> The code does already exist in a Github repository [2].
>
> Due to the frequent user requests and the fact that the code is already
> present, I think would be a great feature for Flink to have.
> Since this is a library on top of Flink's existing APIs this should not be
> too hard to review.
>
> What do others think?
>
> Best, Fabian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving
> [2] https://github.com/FlinkML/flink-speculative-modelServer
>
> Am Mo., 5. Feb. 2018 um 13:11 Uhr schrieb Stavros Kontopoulos <
> st.kontopou...@gmail.com>:
>
> > Thanx @Fabian. I will update the document accordingly wrt metrics.
> > I agree there are pros and cons.
> >
> > Best,
> > Stavros
> >
> >
> > On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> >
> > > OK, I think there was plenty of time to comment on this FLIP.
> > > I'll move it to the ACCEPTED status.
> > >
> > > @Stavros, please consider the feedback regarding the metrics.
> > > I agree with Chesnay that metrics should be primarily exposed via the
> > > metrics system.
> > > Storing them in state makes them fault-tolerant and queryable if the
> > state
> > > is properly configured.
> > >
> > > Thanks,
> > > Fabian
> > >
> > > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <ches...@apache.org>:
> > >
> > > > I'm currently looking over it, but one thing that stood out was that
> > the
> > > > FLIP proposes to use queryable state
> > > > as a monitoring solution. Given that we have a metric system that
> > > > integrates with plenty of commonly used
> > > > metric backends this doesn't really make sense to me.
> > > >
> > > > Storing them in state still has value in terms of fault-tolerance
> > though,
> > > > since this is something that the metric
> > > > system doesn't provide by itself.
> > > >
> > > >
> > > > On 18.01.2018 13:57, Fabian Hueske wrote:
> > > >
> > > >> Are there any more comments on the FLIP?
> > > >>
> > > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1]
> and
> > > >> continue with the implementation.
> > > >>
> > > >> Also, is there a committer who'd like to shepherd the FLIP and
> review
> > > the
> > > >> corresponding PRs?
> > > >> Of course, everybody is welcome to review the code but we need at
> > least
> > > >> one
> > > >> committer who will eventually merge the changes.
> > > >>
> > > >> Best,
> > > >> Fabian
> > > >>
> > > >> [1]
> > > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+
> > > >> Improvement+Proposals
> > > >>
> > > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
> > > >>
> > > >> Hi,
> > > >>>
> > > >>> Sorry for the late follow up.
> > > >>>
> > > >>> I think I understand the motivation for choosing ProtoBuf as the
> > > >>> representation and serialization format and this makes sense to me.
> > > >>>
> > > >>> However, it might be a good idea to provide tooling to convert
> Flink
> > > >>> types
> > > >>> (described as TypeInformation) to ProtoBuf.
> > > >>> Otherwise, users of the model serving library would need to
> manually
> > > >>> convert their data types (say Scala tuples, case classes, or Avro
> > > Pojos)
> > > >>> to
> > > >>> ProtoBuf messages.
> > > >>> I don't think that this needs to be included in the first version
> but
> > > it
> > > >>> might be a good extension to make the library easier to use.
> > > >>>
> > > >>> Best,
> > > >>> Fabian
> > > >>>
> > > >>>
> > > >>>
> > > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky <
> > > >>> boris.lublin...@lightbend.com>
> > > >>> :
> > > >>>
> > > >>> Thanks Fabian,
> > > >>>> More below
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> Boris Lublinsky
> > > >>>> FDP Architect
> > > >>>> boris.lublin...@lightbend.com
> > > >>>> https://www.lightbend.com/
> > > >>>>
> > > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > > >>>>
> > > >>>> Hi Boris and Stavros,
> > > >>>>
> > > >>>> Thanks for the responses.
> > > >>>>
> > > >>>> Ad 1) Thanks for the clarification. I think I misunderstood this
> > part
> > > of
> > > >>>> the proposal.
> > > >>>> I interpreted the argument why to chose ProtoBuf for network
> > encoding
> > > >>>> ("ability
> > > >>>> to represent different data types") such that different a model
> > > pipeline
> > > >>>> should work on different data types.
> > > >>>> I agree that it should be possible to give records of the same
> type
> > > (but
> > > >>>> with different keys) to different models. The key-based join
> > approach
> > > >>>> looks
> > > >>>> good to me.
> > > >>>>
> > > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize
> > models
> > > >>>> for
> > > >>>> the given reasons.
> > > >>>> However, the choice of ProtoBuf serialization for the records
> might
> > > make
> > > >>>> the integration with existing libraries and also regular
> DataStream
> > > >>>> programs more difficult.
> > > >>>> They all use Flink's TypeSerializer system to serialize and
> > > deserialize
> > > >>>> records by default. Hence, we would need to add a conversion step
> > > before
> > > >>>> records can be passed to a model serving operator.
> > > >>>> Are you expecting some common format that all records follow (such
> > as
> > > a
> > > >>>> Row or Vector type) or do you plan to support arbitrary records
> such
> > > as
> > > >>>> Pojos?
> > > >>>> If you plan for a specific type, you could add a TypeInformation
> for
> > > >>>> this
> > > >>>> type with a TypeSerializer that is based on ProtoBuf.
> > > >>>>
> > > >>>> The way I look at it is slightly different. The common format for
> > > >>>> records, supported by Flink, is Byte array with a little bit of
> > > header,
> > > >>>> describing data type and is used for routing. The actual
> > unmarshalling
> > > >>>> is
> > > >>>> done by the model implementation itself. This provides the maximum
> > > >>>> flexibility and gives user the freedom to create his own types
> > without
> > > >>>> breaking underlying framework.
> > > >>>>
> > > >>>> Ad 4) @Boris: I made this point not about the serialization format
> > but
> > > >>>> how the library would integrate with Flink's DataStream API.
> > > >>>> I thought I had seen a code snippet that showed a new method on
> the
> > > >>>> DataStream object but cannot find this anymore.
> > > >>>> So, I just wanted to make the point that we should not change the
> > > >>>> DataStream API (unless it lacks support for some features) and
> built
> > > the
> > > >>>> model serving library on top of it.
> > > >>>> But I get from Stavros answer that this is your design anyway.
> > > >>>>
> > > >>>> Ad 5) The metrics system is the default way to expose system and
> job
> > > >>>> metrics in Flink. Due to the pluggable reporter interface and
> > various
> > > >>>> reporters, they can be easily integrated in many production
> > > >>>> environments.
> > > >>>> A solution based on queryable state will always need custom code
> to
> > > >>>> access the information. Of course this can be an optional feature.
> > > >>>>
> > > >>>> What do others think about this proposal?
> > > >>>>
> > > >>>> We had agreement among work group - Eron, Bas, Andrea, etc, but
> you
> > > are
> > > >>>> the first one outside of it. My book https://www.lightbend.com
> > > >>>> /blog/serving-machine-learning-models-free-oreilly-
> > > ebook-from-lightbend
> > > >>>> has
> > > >>>>
> > > >>>> a reasonably good reviews, so we are hoping this will work
> > > >>>>
> > > >>>>
> > > >>>> Best, Fabian
> > > >>>>
> > > >>>>
> > > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos <
> > > >>>> st.kontopou...@gmail.com>
> > > >>>> :
> > > >>>>
> > > >>>> Hi Fabian thanx!
> > > >>>>>
> > > >>>>>
> > > >>>>> 1) Is it a strict requirement that a ML pipeline must be able to
> > > handle
> > > >>>>>> different input types?
> > > >>>>>> I understand that it makes sense to have different models for
> > > >>>>>> different
> > > >>>>>> instances of the same type, i.e., same data type but different
> > keys.
> > > >>>>>>
> > > >>>>> Hence,
> > > >>>>>
> > > >>>>>> the key-based joins make sense to me. However, couldn't
> completely
> > > >>>>>> different types be handled by different ML pipelines or would
> > there
> > > be
> > > >>>>>> major drawbacks?
> > > >>>>>>
> > > >>>>>
> > > >>>>> Could you elaborate more on this? Right now we only use keys when
> > we
> > > do
> > > >>>>> the
> > > >>>>> join. A given pipeline can handle only a well defined type (the
> > type
> > > >>>>> can
> > > >>>>> be
> > > >>>>> a simple string with a custom value, no need to be a
> > > >>>>> class type) which serves as a key.
> > > >>>>>
> > > >>>>> 2)
> > > >>>>>
> > > >>>>> I think from an API point of view it would be better to not
> require
> > > >>>>>
> > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the
> > model
> > > >>>>>>
> > > >>>>> server
> > > >>>>>
> > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if
> > necessary)
> > > >>>>>>
> > > >>>>> convert
> > > >>>>>
> > > >>>>>> them to ProtoBuf messages internally. In case we need to support
> > > >>>>>>
> > > >>>>> different
> > > >>>>>
> > > >>>>>> types of records (see my first point), we can introduce a Union
> > type
> > > >>>>>>
> > > >>>>> (i.e.,
> > > >>>>>
> > > >>>>>> an n-ary Either type). I see that we need some kind of binary
> > > encoding
> > > >>>>>> format for the models but maybe also this can be designed to be
> > > >>>>>>
> > > >>>>> pluggable
> > > >>>>>
> > > >>>>>> such that later other encodings can be added.
> > > >>>>>>
> > > >>>>>>   We do uses scala classes (strongly typed classes), protobuf is
> > > only
> > > >>>>> used
> > > >>>>> on the wire. For on the wire encoding we prefer protobufs for
> size,
> > > >>>>> expressiveness and ability to represent different data types.
> > > >>>>>
> > > >>>>> 3)
> > > >>>>>
> > > >>>>> I think the DataStream Java API should be supported as a first
> > class
> > > >>>>>
> > > >>>>>> citizens for this library.
> > > >>>>>>
> > > >>>>>
> > > >>>>> I agree. It should be either first priority or a next thing to
> do.
> > > >>>>>
> > > >>>>>
> > > >>>>> 4)
> > > >>>>>
> > > >>>>> For the integration with the DataStream API, we could provide an
> > API
> > > >>>>> that
> > > >>>>>
> > > >>>>>> receives (typed) DataStream objects, internally constructs the
> > > >>>>>>
> > > >>>>> DataStream
> > > >>>>>
> > > >>>>>> operators, and returns one (or more) result DataStreams. The
> > benefit
> > > >>>>>> is
> > > >>>>>> that we don't need to change the DataStream API directly, but
> put
> > a
> > > >>>>>>
> > > >>>>> library
> > > >>>>>
> > > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this
> > > approach.
> > > >>>>>>
> > > >>>>>
> > > >>>>>   We will provide a DSL which will do jsut this. But even without
> > the
> > > >>>>> DSL
> > > >>>>> this is what we do with low level joins.
> > > >>>>>
> > > >>>>>
> > > >>>>> 5)
> > > >>>>>
> > > >>>>> I'm skeptical about using queryable state to expose metrics. Did
> > you
> > > >>>>>> consider using Flink's metrics system [1]? It is easily
> > configurable
> > > >>>>>>
> > > >>>>> and we
> > > >>>>>
> > > >>>>>> provided several reporters that export the metrics.
> > > >>>>>>
> > > >>>>>> This of course is an option. The choice of queryable state was
> > > mostly
> > > >>>>> driven by a simplicity of real time integration.  Any reason why
> > > >>>>> metrics
> > > >>>>> system is netter?
> > > >>>>>
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Stavros
> > > >>>>>
> > > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <
> fhue...@gmail.com>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>> Hi Stavros,
> > > >>>>>>
> > > >>>>>> thanks for the detailed FLIP!
> > > >>>>>> Model serving is an important use case and it's great to see
> > efforts
> > > >>>>>>
> > > >>>>> to add
> > > >>>>>
> > > >>>>>> a library for this to Flink!
> > > >>>>>>
> > > >>>>>> I've read the FLIP and would like to ask a few questions and
> make
> > > some
> > > >>>>>> suggestions.
> > > >>>>>>
> > > >>>>>> 1) Is it a strict requirement that a ML pipeline must be able to
> > > >>>>>> handle
> > > >>>>>> different input types?
> > > >>>>>> I understand that it makes sense to have different models for
> > > >>>>>> different
> > > >>>>>> instances of the same type, i.e., same data type but different
> > keys.
> > > >>>>>>
> > > >>>>> Hence,
> > > >>>>>
> > > >>>>>> the key-based joins make sense to me. However, couldn't
> completely
> > > >>>>>> different types be handled by different ML pipelines or would
> > there
> > > be
> > > >>>>>> major drawbacks?
> > > >>>>>>
> > > >>>>>> 2) I think from an API point of view it would be better to not
> > > require
> > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the
> > model
> > > >>>>>>
> > > >>>>> server
> > > >>>>>
> > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if
> > necessary)
> > > >>>>>>
> > > >>>>> convert
> > > >>>>>
> > > >>>>>> them to ProtoBuf messages internally. In case we need to support
> > > >>>>>>
> > > >>>>> different
> > > >>>>>
> > > >>>>>> types of records (see my first point), we can introduce a Union
> > type
> > > >>>>>>
> > > >>>>> (i.e.,
> > > >>>>>
> > > >>>>>> an n-ary Either type). I see that we need some kind of binary
> > > encoding
> > > >>>>>> format for the models but maybe also this can be designed to be
> > > >>>>>>
> > > >>>>> pluggable
> > > >>>>>
> > > >>>>>> such that later other encodings can be added.
> > > >>>>>>
> > > >>>>>> 3) I think the DataStream Java API should be supported as a
> first
> > > >>>>>> class
> > > >>>>>> citizens for this library.
> > > >>>>>>
> > > >>>>>> 4) For the integration with the DataStream API, we could provide
> > an
> > > >>>>>> API
> > > >>>>>> that receives (typed) DataStream objects, internally constructs
> > the
> > > >>>>>> DataStream operators, and returns one (or more) result
> > DataStreams.
> > > >>>>>> The
> > > >>>>>> benefit is that we don't need to change the DataStream API
> > directly,
> > > >>>>>>
> > > >>>>> but
> > > >>>>>
> > > >>>>>> put a library on top. The other libraries (CEP, Table, Gelly)
> > follow
> > > >>>>>>
> > > >>>>> this
> > > >>>>>
> > > >>>>>> approach.
> > > >>>>>>
> > > >>>>>> 5) I'm skeptical about using queryable state to expose metrics.
> > Did
> > > >>>>>> you
> > > >>>>>> consider using Flink's metrics system [1]? It is easily
> > configurable
> > > >>>>>>
> > > >>>>> and we
> > > >>>>>
> > > >>>>>> provided several reporters that export the metrics.
> > > >>>>>>
> > > >>>>>> What do you think?
> > > >>>>>> Best, Fabian
> > > >>>>>>
> > > >>>>>> [1]
> > > >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> > > >>>>>>
> > > >>>>> monitoring/
> > > >>>>>
> > > >>>>>> metrics.html
> > > >>>>>>
> > > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos <
> > > >>>>>>
> > > >>>>> st.kontopou...@gmail.com>:
> > > >>>>>
> > > >>>>>> Hi guys,
> > > >>>>>>>
> > > >>>>>>> Let's discuss the new FLIP proposal for model serving over
> Flink.
> > > The
> > > >>>>>>>
> > > >>>>>> idea
> > > >>>>>>
> > > >>>>>>> is to combine previous efforts there and provide a library on
> top
> > > of
> > > >>>>>>>
> > > >>>>>> Flink
> > > >>>>>>
> > > >>>>>>> for serving models.
> > > >>>>>>>
> > > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > >>>>>>>
> > > >>>>>> 23+-+Model+Serving
> > > >>>>>>
> > > >>>>>>> Code from previous efforts can be found here:
> > > >>>>>>>
> > > >>>>>> https://github.com/FlinkML
> > > >>>>>
> > > >>>>>> Best,
> > > >>>>>>> Stavros
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>
> > > >>>>
> > > >
> > >
> >
>

Reply via email to