Hi Robert, Boris,

Sorry for the late reply. I took some time to look at the implementation. I
think it looks good overall to me.
Since this is pretty much a big PR. I would like to raise a bit of
discussion beforehand:

1. In order to simplify the PR complexity, can we do the following?
- the example in a separated PR
- either the Java or Scala implementation of the model-serving module into
a separated PR (they look pretty much similar to me)
This way it makes the PR more clean and readable.

2.  I think the idea to carve out this implementation into:
    - model layer;
    - serving layer;
    - queryable state API
is a very good idea! On a higher level, this should enable usage of any
model in serving perspective, so the question is: does DataConverter /
Model conformed with Ski-learn's pipeline definition (My read on this is:
yes). My understanding is this is generic enough to support any "Model",
for example the model defined in FLIP-39, but it might be a good idea for
the authors to also take a look (CCed: @weihua, @shaoxuan)

3. Implementation
- following up with #1, can we move the architecture to
    - model-server-base
    - model-server-java/scala //specific to each language
There are many duplicate codes between the Java and Scala implementations
that I think might be able to put them as a base, for example, implemented
in Scala (since most of the basic math / ML packages[1][2] are all
implemented Scala-based).

- Type system of this PR does not use too much of Flink's type information
(other than the ByteArray, which is a pass-through), maybe we can leverage
this for the strong-type guarantee on the data processor side. (CCed @timo
who might have some idea regarding the type system in a whole, as he is
working on the table type system FLIP-37). My thoughts is, the model itself
is protobuf thus must be in ByteArray, but some auxiliary data (stats,
additional data type, record type, etc) can use Flink's type system.

I will finish the review and add comments to the PR asap.

Thanks,
Rong

[1] https://github.com/scalanlp/breeze
[2] https://spark.apache.org/docs/latest/ml-guide.html

On Tue, Apr 30, 2019 at 2:33 AM Robert Metzger <rmetz...@apache.org> wrote:

> Hey all,
>
> I'm wondering if somebody on the list can take a look at the PR from
> FLIP-23: https://github.com/apache/flink/pull/7446
>
>
> On Mon, Oct 1, 2018 at 6:13 PM Rong Rong <walter...@gmail.com> wrote:
>
> > Thanks for the contribution Boris!! I've been playing around with the
> basic
> > model for a while back and loved it.
> > +1 and really looking forward to having the feature merging back to Flink
> > ML.
> >
> > --
> > Rong
> >
> > On Mon, Oct 1, 2018 at 7:55 AM Fabian Hueske <fhue...@gmail.com> wrote:
> >
> > > Hi everybody,
> > >
> > > The question of how to serve ML models in Flink applications came up in
> > > several conversations I had with Flink users in the last months.
> > > Recently, Boris approached me and he told me that he'd like to revive
> the
> > > efforts around FLIP-23 [1].
> > >
> > > In the last days, Boris extended the proposal by a speculative model
> > > evaluation which allows for evaluating multiple modes of varying
> > complexity
> > > to ensure certain SLAs.
> > > The code does already exist in a Github repository [2].
> > >
> > > Due to the frequent user requests and the fact that the code is already
> > > present, I think would be a great feature for Flink to have.
> > > Since this is a library on top of Flink's existing APIs this should not
> > be
> > > too hard to review.
> > >
> > > What do others think?
> > >
> > > Best, Fabian
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving
> > > [2] https://github.com/FlinkML/flink-speculative-modelServer
> > >
> > > Am Mo., 5. Feb. 2018 um 13:11 Uhr schrieb Stavros Kontopoulos <
> > > st.kontopou...@gmail.com>:
> > >
> > > > Thanx @Fabian. I will update the document accordingly wrt metrics.
> > > > I agree there are pros and cons.
> > > >
> > > > Best,
> > > > Stavros
> > > >
> > > >
> > > > On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <fhue...@gmail.com>
> > > wrote:
> > > >
> > > > > OK, I think there was plenty of time to comment on this FLIP.
> > > > > I'll move it to the ACCEPTED status.
> > > > >
> > > > > @Stavros, please consider the feedback regarding the metrics.
> > > > > I agree with Chesnay that metrics should be primarily exposed via
> the
> > > > > metrics system.
> > > > > Storing them in state makes them fault-tolerant and queryable if
> the
> > > > state
> > > > > is properly configured.
> > > > >
> > > > > Thanks,
> > > > > Fabian
> > > > >
> > > > > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <ches...@apache.org>:
> > > > >
> > > > > > I'm currently looking over it, but one thing that stood out was
> > that
> > > > the
> > > > > > FLIP proposes to use queryable state
> > > > > > as a monitoring solution. Given that we have a metric system that
> > > > > > integrates with plenty of commonly used
> > > > > > metric backends this doesn't really make sense to me.
> > > > > >
> > > > > > Storing them in state still has value in terms of fault-tolerance
> > > > though,
> > > > > > since this is something that the metric
> > > > > > system doesn't provide by itself.
> > > > > >
> > > > > >
> > > > > > On 18.01.2018 13:57, Fabian Hueske wrote:
> > > > > >
> > > > > >> Are there any more comments on the FLIP?
> > > > > >>
> > > > > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs
> [1]
> > > and
> > > > > >> continue with the implementation.
> > > > > >>
> > > > > >> Also, is there a committer who'd like to shepherd the FLIP and
> > > review
> > > > > the
> > > > > >> corresponding PRs?
> > > > > >> Of course, everybody is welcome to review the code but we need
> at
> > > > least
> > > > > >> one
> > > > > >> committer who will eventually merge the changes.
> > > > > >>
> > > > > >> Best,
> > > > > >> Fabian
> > > > > >>
> > > > > >> [1]
> > > > > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+
> > > > > >> Improvement+Proposals
> > > > > >>
> > > > > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
> > > > > >>
> > > > > >> Hi,
> > > > > >>>
> > > > > >>> Sorry for the late follow up.
> > > > > >>>
> > > > > >>> I think I understand the motivation for choosing ProtoBuf as
> the
> > > > > >>> representation and serialization format and this makes sense to
> > me.
> > > > > >>>
> > > > > >>> However, it might be a good idea to provide tooling to convert
> > > Flink
> > > > > >>> types
> > > > > >>> (described as TypeInformation) to ProtoBuf.
> > > > > >>> Otherwise, users of the model serving library would need to
> > > manually
> > > > > >>> convert their data types (say Scala tuples, case classes, or
> Avro
> > > > > Pojos)
> > > > > >>> to
> > > > > >>> ProtoBuf messages.
> > > > > >>> I don't think that this needs to be included in the first
> version
> > > but
> > > > > it
> > > > > >>> might be a good extension to make the library easier to use.
> > > > > >>>
> > > > > >>> Best,
> > > > > >>> Fabian
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky <
> > > > > >>> boris.lublin...@lightbend.com>
> > > > > >>> :
> > > > > >>>
> > > > > >>> Thanks Fabian,
> > > > > >>>> More below
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> Boris Lublinsky
> > > > > >>>> FDP Architect
> > > > > >>>> boris.lublin...@lightbend.com
> > > > > >>>> https://www.lightbend.com/
> > > > > >>>>
> > > > > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <fhue...@gmail.com
> >
> > > > wrote:
> > > > > >>>>
> > > > > >>>> Hi Boris and Stavros,
> > > > > >>>>
> > > > > >>>> Thanks for the responses.
> > > > > >>>>
> > > > > >>>> Ad 1) Thanks for the clarification. I think I misunderstood
> this
> > > > part
> > > > > of
> > > > > >>>> the proposal.
> > > > > >>>> I interpreted the argument why to chose ProtoBuf for network
> > > > encoding
> > > > > >>>> ("ability
> > > > > >>>> to represent different data types") such that different a
> model
> > > > > pipeline
> > > > > >>>> should work on different data types.
> > > > > >>>> I agree that it should be possible to give records of the same
> > > type
> > > > > (but
> > > > > >>>> with different keys) to different models. The key-based join
> > > > approach
> > > > > >>>> looks
> > > > > >>>> good to me.
> > > > > >>>>
> > > > > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize
> > > > models
> > > > > >>>> for
> > > > > >>>> the given reasons.
> > > > > >>>> However, the choice of ProtoBuf serialization for the records
> > > might
> > > > > make
> > > > > >>>> the integration with existing libraries and also regular
> > > DataStream
> > > > > >>>> programs more difficult.
> > > > > >>>> They all use Flink's TypeSerializer system to serialize and
> > > > > deserialize
> > > > > >>>> records by default. Hence, we would need to add a conversion
> > step
> > > > > before
> > > > > >>>> records can be passed to a model serving operator.
> > > > > >>>> Are you expecting some common format that all records follow
> > (such
> > > > as
> > > > > a
> > > > > >>>> Row or Vector type) or do you plan to support arbitrary
> records
> > > such
> > > > > as
> > > > > >>>> Pojos?
> > > > > >>>> If you plan for a specific type, you could add a
> TypeInformation
> > > for
> > > > > >>>> this
> > > > > >>>> type with a TypeSerializer that is based on ProtoBuf.
> > > > > >>>>
> > > > > >>>> The way I look at it is slightly different. The common format
> > for
> > > > > >>>> records, supported by Flink, is Byte array with a little bit
> of
> > > > > header,
> > > > > >>>> describing data type and is used for routing. The actual
> > > > unmarshalling
> > > > > >>>> is
> > > > > >>>> done by the model implementation itself. This provides the
> > maximum
> > > > > >>>> flexibility and gives user the freedom to create his own types
> > > > without
> > > > > >>>> breaking underlying framework.
> > > > > >>>>
> > > > > >>>> Ad 4) @Boris: I made this point not about the serialization
> > format
> > > > but
> > > > > >>>> how the library would integrate with Flink's DataStream API.
> > > > > >>>> I thought I had seen a code snippet that showed a new method
> on
> > > the
> > > > > >>>> DataStream object but cannot find this anymore.
> > > > > >>>> So, I just wanted to make the point that we should not change
> > the
> > > > > >>>> DataStream API (unless it lacks support for some features) and
> > > built
> > > > > the
> > > > > >>>> model serving library on top of it.
> > > > > >>>> But I get from Stavros answer that this is your design anyway.
> > > > > >>>>
> > > > > >>>> Ad 5) The metrics system is the default way to expose system
> and
> > > job
> > > > > >>>> metrics in Flink. Due to the pluggable reporter interface and
> > > > various
> > > > > >>>> reporters, they can be easily integrated in many production
> > > > > >>>> environments.
> > > > > >>>> A solution based on queryable state will always need custom
> code
> > > to
> > > > > >>>> access the information. Of course this can be an optional
> > feature.
> > > > > >>>>
> > > > > >>>> What do others think about this proposal?
> > > > > >>>>
> > > > > >>>> We had agreement among work group - Eron, Bas, Andrea, etc,
> but
> > > you
> > > > > are
> > > > > >>>> the first one outside of it. My book
> https://www.lightbend.com
> > > > > >>>> /blog/serving-machine-learning-models-free-oreilly-
> > > > > ebook-from-lightbend
> > > > > >>>> has
> > > > > >>>>
> > > > > >>>> a reasonably good reviews, so we are hoping this will work
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> Best, Fabian
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos <
> > > > > >>>> st.kontopou...@gmail.com>
> > > > > >>>> :
> > > > > >>>>
> > > > > >>>> Hi Fabian thanx!
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> 1) Is it a strict requirement that a ML pipeline must be able
> > to
> > > > > handle
> > > > > >>>>>> different input types?
> > > > > >>>>>> I understand that it makes sense to have different models
> for
> > > > > >>>>>> different
> > > > > >>>>>> instances of the same type, i.e., same data type but
> different
> > > > keys.
> > > > > >>>>>>
> > > > > >>>>> Hence,
> > > > > >>>>>
> > > > > >>>>>> the key-based joins make sense to me. However, couldn't
> > > completely
> > > > > >>>>>> different types be handled by different ML pipelines or
> would
> > > > there
> > > > > be
> > > > > >>>>>> major drawbacks?
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>> Could you elaborate more on this? Right now we only use keys
> > when
> > > > we
> > > > > do
> > > > > >>>>> the
> > > > > >>>>> join. A given pipeline can handle only a well defined type
> (the
> > > > type
> > > > > >>>>> can
> > > > > >>>>> be
> > > > > >>>>> a simple string with a custom value, no need to be a
> > > > > >>>>> class type) which serves as a key.
> > > > > >>>>>
> > > > > >>>>> 2)
> > > > > >>>>>
> > > > > >>>>> I think from an API point of view it would be better to not
> > > require
> > > > > >>>>>
> > > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead,
> the
> > > > model
> > > > > >>>>>>
> > > > > >>>>> server
> > > > > >>>>>
> > > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if
> > > > necessary)
> > > > > >>>>>>
> > > > > >>>>> convert
> > > > > >>>>>
> > > > > >>>>>> them to ProtoBuf messages internally. In case we need to
> > support
> > > > > >>>>>>
> > > > > >>>>> different
> > > > > >>>>>
> > > > > >>>>>> types of records (see my first point), we can introduce a
> > Union
> > > > type
> > > > > >>>>>>
> > > > > >>>>> (i.e.,
> > > > > >>>>>
> > > > > >>>>>> an n-ary Either type). I see that we need some kind of
> binary
> > > > > encoding
> > > > > >>>>>> format for the models but maybe also this can be designed to
> > be
> > > > > >>>>>>
> > > > > >>>>> pluggable
> > > > > >>>>>
> > > > > >>>>>> such that later other encodings can be added.
> > > > > >>>>>>
> > > > > >>>>>>   We do uses scala classes (strongly typed classes),
> protobuf
> > is
> > > > > only
> > > > > >>>>> used
> > > > > >>>>> on the wire. For on the wire encoding we prefer protobufs for
> > > size,
> > > > > >>>>> expressiveness and ability to represent different data types.
> > > > > >>>>>
> > > > > >>>>> 3)
> > > > > >>>>>
> > > > > >>>>> I think the DataStream Java API should be supported as a
> first
> > > > class
> > > > > >>>>>
> > > > > >>>>>> citizens for this library.
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>> I agree. It should be either first priority or a next thing
> to
> > > do.
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> 4)
> > > > > >>>>>
> > > > > >>>>> For the integration with the DataStream API, we could provide
> > an
> > > > API
> > > > > >>>>> that
> > > > > >>>>>
> > > > > >>>>>> receives (typed) DataStream objects, internally constructs
> the
> > > > > >>>>>>
> > > > > >>>>> DataStream
> > > > > >>>>>
> > > > > >>>>>> operators, and returns one (or more) result DataStreams. The
> > > > benefit
> > > > > >>>>>> is
> > > > > >>>>>> that we don't need to change the DataStream API directly,
> but
> > > put
> > > > a
> > > > > >>>>>>
> > > > > >>>>> library
> > > > > >>>>>
> > > > > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this
> > > > > approach.
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>>   We will provide a DSL which will do jsut this. But even
> > without
> > > > the
> > > > > >>>>> DSL
> > > > > >>>>> this is what we do with low level joins.
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> 5)
> > > > > >>>>>
> > > > > >>>>> I'm skeptical about using queryable state to expose metrics.
> > Did
> > > > you
> > > > > >>>>>> consider using Flink's metrics system [1]? It is easily
> > > > configurable
> > > > > >>>>>>
> > > > > >>>>> and we
> > > > > >>>>>
> > > > > >>>>>> provided several reporters that export the metrics.
> > > > > >>>>>>
> > > > > >>>>>> This of course is an option. The choice of queryable state
> was
> > > > > mostly
> > > > > >>>>> driven by a simplicity of real time integration.  Any reason
> > why
> > > > > >>>>> metrics
> > > > > >>>>> system is netter?
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> Best,
> > > > > >>>>> Stavros
> > > > > >>>>>
> > > > > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <
> > > fhue...@gmail.com>
> > > > > >>>>> wrote:
> > > > > >>>>>
> > > > > >>>>> Hi Stavros,
> > > > > >>>>>>
> > > > > >>>>>> thanks for the detailed FLIP!
> > > > > >>>>>> Model serving is an important use case and it's great to see
> > > > efforts
> > > > > >>>>>>
> > > > > >>>>> to add
> > > > > >>>>>
> > > > > >>>>>> a library for this to Flink!
> > > > > >>>>>>
> > > > > >>>>>> I've read the FLIP and would like to ask a few questions and
> > > make
> > > > > some
> > > > > >>>>>> suggestions.
> > > > > >>>>>>
> > > > > >>>>>> 1) Is it a strict requirement that a ML pipeline must be
> able
> > to
> > > > > >>>>>> handle
> > > > > >>>>>> different input types?
> > > > > >>>>>> I understand that it makes sense to have different models
> for
> > > > > >>>>>> different
> > > > > >>>>>> instances of the same type, i.e., same data type but
> different
> > > > keys.
> > > > > >>>>>>
> > > > > >>>>> Hence,
> > > > > >>>>>
> > > > > >>>>>> the key-based joins make sense to me. However, couldn't
> > > completely
> > > > > >>>>>> different types be handled by different ML pipelines or
> would
> > > > there
> > > > > be
> > > > > >>>>>> major drawbacks?
> > > > > >>>>>>
> > > > > >>>>>> 2) I think from an API point of view it would be better to
> not
> > > > > require
> > > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead,
> the
> > > > model
> > > > > >>>>>>
> > > > > >>>>> server
> > > > > >>>>>
> > > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if
> > > > necessary)
> > > > > >>>>>>
> > > > > >>>>> convert
> > > > > >>>>>
> > > > > >>>>>> them to ProtoBuf messages internally. In case we need to
> > support
> > > > > >>>>>>
> > > > > >>>>> different
> > > > > >>>>>
> > > > > >>>>>> types of records (see my first point), we can introduce a
> > Union
> > > > type
> > > > > >>>>>>
> > > > > >>>>> (i.e.,
> > > > > >>>>>
> > > > > >>>>>> an n-ary Either type). I see that we need some kind of
> binary
> > > > > encoding
> > > > > >>>>>> format for the models but maybe also this can be designed to
> > be
> > > > > >>>>>>
> > > > > >>>>> pluggable
> > > > > >>>>>
> > > > > >>>>>> such that later other encodings can be added.
> > > > > >>>>>>
> > > > > >>>>>> 3) I think the DataStream Java API should be supported as a
> > > first
> > > > > >>>>>> class
> > > > > >>>>>> citizens for this library.
> > > > > >>>>>>
> > > > > >>>>>> 4) For the integration with the DataStream API, we could
> > provide
> > > > an
> > > > > >>>>>> API
> > > > > >>>>>> that receives (typed) DataStream objects, internally
> > constructs
> > > > the
> > > > > >>>>>> DataStream operators, and returns one (or more) result
> > > > DataStreams.
> > > > > >>>>>> The
> > > > > >>>>>> benefit is that we don't need to change the DataStream API
> > > > directly,
> > > > > >>>>>>
> > > > > >>>>> but
> > > > > >>>>>
> > > > > >>>>>> put a library on top. The other libraries (CEP, Table,
> Gelly)
> > > > follow
> > > > > >>>>>>
> > > > > >>>>> this
> > > > > >>>>>
> > > > > >>>>>> approach.
> > > > > >>>>>>
> > > > > >>>>>> 5) I'm skeptical about using queryable state to expose
> > metrics.
> > > > Did
> > > > > >>>>>> you
> > > > > >>>>>> consider using Flink's metrics system [1]? It is easily
> > > > configurable
> > > > > >>>>>>
> > > > > >>>>> and we
> > > > > >>>>>
> > > > > >>>>>> provided several reporters that export the metrics.
> > > > > >>>>>>
> > > > > >>>>>> What do you think?
> > > > > >>>>>> Best, Fabian
> > > > > >>>>>>
> > > > > >>>>>> [1]
> > > > > >>>>>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> > > > > >>>>>>
> > > > > >>>>> monitoring/
> > > > > >>>>>
> > > > > >>>>>> metrics.html
> > > > > >>>>>>
> > > > > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos <
> > > > > >>>>>>
> > > > > >>>>> st.kontopou...@gmail.com>:
> > > > > >>>>>
> > > > > >>>>>> Hi guys,
> > > > > >>>>>>>
> > > > > >>>>>>> Let's discuss the new FLIP proposal for model serving over
> > > Flink.
> > > > > The
> > > > > >>>>>>>
> > > > > >>>>>> idea
> > > > > >>>>>>
> > > > > >>>>>>> is to combine previous efforts there and provide a library
> on
> > > top
> > > > > of
> > > > > >>>>>>>
> > > > > >>>>>> Flink
> > > > > >>>>>>
> > > > > >>>>>>> for serving models.
> > > > > >>>>>>>
> > > > > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > > >>>>>>>
> > > > > >>>>>> 23+-+Model+Serving
> > > > > >>>>>>
> > > > > >>>>>>> Code from previous efforts can be found here:
> > > > > >>>>>>>
> > > > > >>>>>> https://github.com/FlinkML
> > > > > >>>>>
> > > > > >>>>>> Best,
> > > > > >>>>>>> Stavros
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to