Hello,
I am trying to create state for an aggregate function that is used with a
GlobalWindow. This basically looks like:
savepointWriter.withOperator(
OperatorIdentifier.forUid(UID),
OperatorTransformation.bootstrapWith(stateToMigrate)
.keyBy(...)
.window(GlobalWindows.create())
.aggregate(new AggregateFunctionForMigration())
)
The job runs correctly and writes a savepoint, but if I then read the
savepoint I just created and load the state for that UID, the "elements"
iterable in the WindowReaderFunction's readWindow() method has a non-zero
size, but every element is null.
I've tried specifying a custom trigger between window() and aggregate(),
always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.
Am I missing something?
Regards,
Alexis.