Hi Theo,

This is indeed a tricky feature to test!

On Thu, Feb 20, 2020 at 8:59 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi,
>
> We have a pipeline which internally uses Java POJOs and also needs to keep
> some events entirely in state for some time.
>
> From time to time, our POJOs evolve, like attributes are added or removed.
>
> Now I wanted to write a E2E test that proves the schema migration works
> (Having different schemas in source kafka topic, flink pipeline state and
> sink) for bounded scenarios (attribute added or removed)
>
> I figured out that in my test, I can instantiate a
> MiniClusterWithClientResource, receive a client, start a job over the
> client and also cancel the job with a savepoint. My idea was to start the
> job, put some records in, cancel with a savepoint and restart the job from
> savepoint, but with a slightly different POJO (added another attribute and
> removed an existing one).
>
> Currently, I'm sadly missing two pieces:
> 1. I don't see a way to restart a job from savepoint via the client
> obtained from the MiniClusterWithClientResource in my test
> 2. According to a flink blog post [1],schema evolution of POJOs is more
> limited, especially the evolved POJO must have the same "nampesacpe" (i.e.
> java package?!) and class name.
>

The way this is sort of overcome by tests in Flink also surrounding schema
/ serializer evolution is to have two different classes (with different
classnames) and reload it in new classloaders so that they can be
"relocated" to have the same names at runtime.
In Flink, we use a `ClassRelocator` utility to do this. You can check out
example usages of it in the `PojoSerializerUpgradeTest` and
`TypeSerializerUpgradeTestBase`.

I'm not entirely sure if it would work in your scenario, but I think it's
worth giving it a try since it'll make writing such tests easier.

If this doesn't work, then you could try doing it such that you have
separate modules (i.e. jars) for the old / new Pojo definition, and then a
separate module that does the actual test logic while loading the jars
containing the old / new Pojos with different classloaders.
That would resemble what happens in reality more closely.


> Especially point 2 seems to make it impossible for me to automate testing
> of the evolution, but need to do it manually.
>
> Do you have any idea how I could overcome these limitations so that I can
> build a proper end to end test for the schema migration to work?
>
> Best regards
> Theo
>
> [1]
> https://flink.apache.org/news/2020/01/29/state-unlocked-interacting-with-state-in-apache-flink.html
>

Hope that helps! Would be great to hear back from you on how it works out.

Cheers,
Gordon

Reply via email to