Hi Matthias,

Thank you for looking into this. That change makes the example work, but my
real job still has issues. There is a key difference that might be causing
the problem, but that's not so easy to replicate in the example I made.

Essentially, I'm trying to modify the partition key of an operator, so in
my real job my bootstrap stream comes from a SavepointReader getting data
from an existing UID so that I can key it differently and write it with a
different UID. So far so good, however, I'm also trying to modify my state
POJO at the same time - the equivalent in my example would be to add a
field to the GenericService POJO. I'm guessing this is causing some
inconsistency.

The reason I think this is that, assuming I debugged the right classes, I
can see that the keyGroupIndex that is used when I write the savepoint is
not the same as when I read from it afterward to validate; I can actually
see the state I bootstrapped in one of the state backend's tables, but
since the indices don't correspond, I get an iterator with a null entry.

Regards,
Alexis.

Am Fr., 2. Aug. 2024 um 13:55 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Alexis,
>
>
>
>
>
> I've worked it out:
>
>
>
> The input of your com.test.Application.StateReader#readWindow(...,
> Iterable<GenericService> elements, ...) is of the
>
> projection type com.test.Application.AggregateFunctionForMigration:
> AggregateFunction<..., OUT = GenericService>.
>
> I.e. you need to implement
> com.test.Application.AggregateFunctionForMigration#getResult e.g. as
>
>
>
>         @Override
>
>         public GenericService getResult(GenericService accumulator) {
>
>             return accumulator;
>
>         }
>
>
>
> If you take a closer look at your call to
> org.apache.flink.state.api.WindowSavepointReader#aggregate(...) you'll see
> that this is indeed the case:
>
>
>
>     /**
>
>      * Reads window state generated using an {@link AggregateFunction}.
>
>      *
>
>      * @param uid The uid of the operator.
>
>      * @param aggregateFunction The aggregate function used to create the
> window.
>
>      * @param readerFunction The window reader function.
>
>      * @param keyType The key type of the window.
>
>      * @param accType The type information of the accumulator function.
>
>      * @param outputType The output type of the reader function.
>
>      * @param <K> The type of the key.
>
>      * @param <T> The type of the values that are aggregated.
>
>      * @param <ACC> The type of the accumulator (intermediate aggregate
> state).
>
>      * @param <R> The type of the aggregated result.
>
>      * @param <OUT> The output type of the reader function.
>
>      * @return A {@code DataStream} of objects read from keyed state.
>
>      * @throws IOException If savepoint does not contain the specified uid.
>
>      */
>
>     public <K, T, ACC, R, OUT> DataStream<OUT> aggregate(
>
>             String uid,
>
>             AggregateFunction<T, ACC, R> aggregateFunction,
>
>             WindowReaderFunction<R, OUT, K, W> readerFunction,
>
>             TypeInformation<K> keyType,
>
>             TypeInformation<ACC> accType,
>
>             TypeInformation<OUT> outputType)
>
>             throws IOException {
>
>
>
>
>
> Cheers
>
>
>
> Thias
>
>
>
> PS: will you come to the FlinkForward conference in October in Berlin (to
> socialize)?
>
>
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com>
> *Sent:* Wednesday, July 31, 2024 3:46 PM
> *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: Using state processor for a custom windowed aggregate
> function
>
>
>
> Hi Matthias,
>
>
>
> This indeed compiles, I am able to actually generate a savepoint, it's
> just that all the window states in that savepoint appear to be null. The
> second argument of withOperator(...) is specified via
> OperatorTransformation...aggregate(), so the final transformation is built
> by WindowedStateTransformation#aggregate().
>
>
>
> I don't have any special logic with timers or even multiple events per
> key, in fact, my "stateToMigrate" already contains a single state instance
> for each key of interest, so my AggregateFunctionForMigration simply
> returns "value" in its add() method, no other logic there.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
> Hi Alexis,
>
>
>
> Just a couple of points to double-check:
>
>    - Does your code compile? (the second argument of withOperator(..)
>    should derive StateBootstrapTransformation<T> instead of
>    SingleOutputStreamOperator<T>)
>    - From the documentation of savepoint API you’ll find examples for
>    each type of state
>    - Your preparation needs to generate events that within your
>    StateBootstrapTransformation impementation get set into state primitives
>    much the same as you would do with a normal streaming operator
>    - Please note that a savepoint api job always runs in batch-mode, hence
>
>
>    - Keyed events are processed in key order first and the in time order
>       - Triggers will only be fired after processing all events of a
>       respective key are processed
>       - Semantics are therefore slightly different as for streaming timers
>
>
>
> Hope that helps 😊
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com>
> *Sent:* Monday, July 29, 2024 9:18 PM
> *To:* user <user@flink.apache.org>
> *Subject:* Using state processor for a custom windowed aggregate function
>
>
>
> Hello,
>
>
>
> I am trying to create state for an aggregate function that is used with a
> GlobalWindow. This basically looks like:
>
>
>
> savepointWriter.withOperator(
>     OperatorIdentifier.forUid(UID),
>     OperatorTransformation.bootstrapWith(stateToMigrate)
>         .keyBy(...)
>         .window(GlobalWindows.create())
>         .aggregate(new AggregateFunctionForMigration())
> )
>
> The job runs correctly and writes a savepoint, but if I then read the
> savepoint I just created and load the state for that UID, the "elements"
> iterable in the WindowReaderFunction's readWindow() method has a non-zero
> size, but every element is null.
>
>
>
> I've tried specifying a custom trigger between window() and aggregate(),
> always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.
>
>
>
> Am I missing something?
>
>
>
> Regards,
>
> Alexis.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to