Hi Arvid,

It is correct that we cannot embed endpoint configuration into the
state snapshot as it may change.

Why access a schema registry for internal state? Based on current
TypeSerializer assumptions, we should know all reader schemas at graph
construction time. But let's consider a generic transformation like
filtering/routing/join that only requires knowledge of a few common
characteristics of records. It does not require the exact schema and
should not require restarting to be able to handle records with a new
schema/version.

It appears that such use cases cannot be supported with TypeSerializer:

1. I tried Seth's suggestion with CompatibleAsIs (with the heap state
backend). Unfortunately the TypeSerializer that is used to deserialize
is still the one restored from snapshot.

2. I attempted to create a TypeSerializer that keeps track of all
schemas seen during serialization so that it can be restored without
external dependency. Unfortunately that also cannot work, because
savepoint creation will first write the serializers before they are
used to write the records.

Thomas

On Thu, Nov 11, 2021 at 12:37 AM Arvid Heise <ar...@apache.org> wrote:
>
> Thanks for clarifying the issue. I'm assuming you can't go the state 
> migration route because the old endpoint may not be reachable anymore? Is 
> that correct?
>
> Why do you need to use a schema registry for internal state anyways? That's a 
> very atypical use case. You would usually like to exert full control over the 
> schema of state within your application. What happens if the schema in the 
> registry adds new fields? They would be completely ignored without 
> programmatic changes. Or do also compile the query dynamically?
>
> Usually, you'd treat state like a file and embed the writer schema in the 
> state, so there is no need for an external registry. That should also work 
> with dynamic queries. Only when you output messages into an event queue, 
> you'd need the schema registry but that's orthogonal.
>
> On Wed, Nov 10, 2021 at 7:09 PM Thomas Weise <t...@apache.org> wrote:
>>
>> Thanks for the feedback!
>>
>> @Seth
>>
>> Your suggestion should work, I yet have to try it out. However relying on 
>> undocumented behavior (return CompatibleAsIs and its serializer will never 
>> be used) would make me hesitant to adopt it as permanent solution.
>>
>> @Arvid
>>
>> There is no issue with constructing the deserializer via main and TypeInfo. 
>> Although a more explicit way to inject dependencies would be desirable, 
>> along the lines of the open method elsewhere. The bigger limitation I run 
>> into is the restore from snapshot behavior, which requires to construct the 
>> TypeSerializer with the no arg constructor and no alternative way to pass a 
>> dependency.
>>
>> Thanks,
>> Thomas
>>
>>
>> On 2021/11/10 13:23:21 Seth Wiesman wrote:
>> > Yes I did, thanks for sending it back :) Copying my previous reply for the
>> > ML:
>> >
>> > Hey Thomas,
>> > >
>> > > You are correct that there is no way to inject dynamic information into
>> > > the TypeSerializer configured from the TypeSerializerSnapshot, but that
>> > > should not be a problem for your use case.
>> > >
>> > > The type serializer instantiated from a TypeSerializerSnapshot is only
>> > > used to perform schema migrations. Assuming the schema registry enforces
>> > > all changes are backwards compatible, your snapshot instance can always
>> > > return CompatibleAsIs and its serializer will never be used.
>> > >
>> > > The tradeoff here is that when the schema does change, Flink will not
>> > > eagerly migrate all values in state but instead lazily migrate as state
>> > > values are updated.
>> > >
>> > > Seth
>> > >
>> >
>> > Currently the TypeSerializerSnapshot logic is completely deterministic, and
>> > my intuition is that we should not change that. Please let us know if what
>> > I described does not work in practice and we can take it from there.
>> >
>> > Seth
>> >
>> > On Wed, Nov 10, 2021 at 3:20 AM Arvid Heise <ar...@apache.org> wrote:
>> >
>> > > Hi Thomas,
>> > >
>> > > Could you add a sketch of your preferred solution? From what I gathered,
>> > > you have all the information available in your main (probably 
>> > > misunderstood
>> > > that), so what's keeping you from adding the TypeSerializer as a field to
>> > > your UDF?
>> > >
>> > > On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski <
>> > > krzysiek.chmielew...@gmail.com> wrote:
>> > >
>> > >> Hi,
>> > >> In my past project I was able to use Spring as a DI provider for Flink
>> > >> Jobs. It actually saves me a lot of hassle while writing/composing jobs 
>> > >> and
>> > >> process functions.
>> > >> I was able to use all Spring's Bean annotations along with properties
>> > >> files managed by Spring as it would be a "normal" spring app. The
>> > >> dependencies that I was injecting via Spring were not
>> > >> serialized/deserialized by Flink which actually was something that I 
>> > >> wanted
>> > >> to achieved. In some cases it is very hard or maybe even impossible to 
>> > >> make
>> > >> some 3rd party classes serializable.
>> > >>
>> > >> Things to highlight here:
>> > >> 1. I did it only for StreamAPI i think it could work also for TableAPI
>> > >> though.
>> > >> 2.I was loading a Spring context from ProcessFunction::open method.
>> > >> I was able to customize via Job parameters which Spring configuration I
>> > >> want to load.
>> > >> After doing this, all fields annotated with @Autowired were injected.
>> > >> 3, I was using standard @Configuration classes
>> > >>
>> > >> Issues:
>> > >> 1. Since i was using operator::open method to load the context, the
>> > >> context will be loaded few times depends on the number of operators
>> > >> deployed on particular Task Manager. This however could be improved.
>> > >> 2. The important thing here was that all your classes have to be
>> > >> "deployed" on every Task Manager/Job Manager in order to load them 
>> > >> through
>> > >> DI.
>> > >> We achieved this by using what is called "Job session" cluster. Where 
>> > >> our
>> > >> custom Flink docker image was build in a way that it contains our job 
>> > >> jar
>> > >> with all dependencies needed.
>> > >>
>> > >> Because of that, we were not be able to use things like AWS EMR or
>> > >> Kinesis.
>> > >>
>> > >> Cheers,
>> > >> Krzysztof Chmielewski
>> > >>
>> > >> wt., 9 lis 2021 o 06:46 Thomas Weise <t...@apache.org> napisaƂ(a):
>> > >>
>> > >>> Hi,
>> > >>>
>> > >>> I was looking into a problem that requires a configurable type
>> > >>> serializer for communication with a schema registry. The service
>> > >>> endpoint can change, so I would not want to make it part of the
>> > >>> serializer snapshot but rather resolve it at graph construction time
>> > >>> (similar to how a Kafka bootstrap URL or JDBC connection URL would not
>> > >>> be embedded into a checkpoint).
>> > >>>
>> > >>> TypeSerializer is instantiated via either TypeInformation or
>> > >>> TypeSerializerSnapshot. While TypeInformation provides access to
>> > >>> ExecutionConfig and therefore ability to access parameters from
>> > >>> GlobalJobParameters that could be provided through the entry point,
>> > >>> restoreSerializer requires the serializer to be constructed from the
>> > >>> snapshot state alone.
>> > >>>
>> > >>> Ideally there would be a dependency injection mechanism for user code.
>> > >>> Discussion in [1] indicated there isn't a direct solution. Has anyone
>> > >>> come across a similar use case and found a way to work around this
>> > >>> limitation? It might be possible to work with a configuration
>> > >>> singleton that initializes from a file in a well known location, but
>> > >>> that depends on the deployment environment and doesn't play nice with
>> > >>> testing.
>> > >>>
>> > >>> Thanks,
>> > >>> Thomas
>> > >>>
>> > >>> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o
>> > >>>
>> > >>
>> >

Reply via email to