Re: Bootstrapping multiple state within same operator

2023-03-24 Thread David Artiga
That sounds like a great hack :D
I'll give it a try for sure. Thank you!
/David

On Fri, Mar 24, 2023 at 5:25 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi David,
>
> … coming in late into this discussion
>
>
>
> We had a very similar problem and I found a simple way to implement
> priming savepoints with mixed keyed/operator state.
>
> The trick is this:
>
>- In your KeyedStateBootstrapFunction also implement
>CheckpointedFunction
>- In initializeState() you can initialize the broad state primitive
>(the code skeleton below uses getUnionListState, same principle)
>- Then in the processElement() function I process a tuple of state
>collections for each state primitive, i.e. event object per key
>- For the union list state I forge special key “broadcast”, and only
>the 3rd tuple vector contains anything,
>   - (the upstream operator feeding into this bootstrap function makes
>   sure only one event with “broadcast” key is generated)
>
>
>
> Peruse the code skeleton (scala) if you like (I removed some stuff I’m not
> supposed to disclose):
>
>
>
>
>
> /** Event type for state updates for savepoint generation. A 4-tuple of
>
> * 1) vector of valid [[CSC]] entries
>
> * 2) a vector of valid [[DAS]] entries
>
> * 3) a vector of valid broadcast FFR.
>
> * 4) a vector of timers for state cleanup */
>
> type DAFFS = (Vector[CSC], Vector[DAS], Vector[FFR], Vector[Long])
>
>
>
> /** [[StateUpdate]] type for [[DAFFS]] state along with the [[String]] key
> context. */
>
> type DAFFSHU = StateUpdate[String, DAFFS]
>
>
>
> class DAFFOperatorStateBootstrapFunction
>
>   extends KeyedStateBootstrapFunction[String, DAFFSHU]
>
> with CheckpointedFunction {
>
>
>
>   override def open(parameters: Configuration): Unit = {
>
> super.open(parameters)
>
> val rtc: RuntimeContext = getRuntimeContext
>
> //keyed state setup:
>
> // cSC = rtc.getListState(new ListStateDescriptor[CSC](...
>
> // dAS = rtc.getListState(new ListStateDescriptor[DAS](...
>
>   }
>
>
>
>   override def processElement(value: DAFFSHU, ctx:
> KeyedStateBootstrapFunction[String, DAFFSHU]#Context): Unit = {
>
>
>
> val daffs = value.state
>
> val ts = ctx.timerService()
>
>
>
> for (csc <- daffs._1) {
>
>   cSC.add(csc)
>
> }
>
> for (das <- daffs._2) {
>
>   dAS.add(das)
>
> }
>
> for (ffr <- daffs._3) {
>
>   fFRState.add(ffr)
>
> }
>
> for (timer <- daffs._4) {
>
>   ts.registerEventTimeTimer(timer)
>
> }
>
>
>
> val stop = 0
>
>   }
>
>
>
>   @transient var fFRState: ListState[FFR] = null
>
>
>
>   override def snapshotState(context: FunctionSnapshotContext): Unit = {
>
>   }
>
>
>
>   override def initializeState(context: FunctionInitializationContext):
> Unit = {
>
> val fFRStateDescriptor = new ListStateDescriptor[FFR]("ffr", ffrTI)
>
> fFRState =
> context.getOperatorStateStore.getUnionListState(fFRStateDescriptor)
>
>   }
>
> }
>
>
>
> Hope this helps …
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* David Artiga 
> *Sent:* Wednesday, March 22, 2023 11:31 AM
> *To:* Hang Ruan 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Bootstrapping multiple state within same operator
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Not familiar with the implementation but thinking some options:
>
> - composable transformations
> - underlying MultiMap
> - ...
>
>
>
> On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan  wrote:
>
> Hi, David,
>
> I also read the code about the `SavepointWriter#withOperator`. The
> transformations are stored in a `Map` whose key is `OperatorID`. I don't
> come up with a way that we could register multi transformations for one
> operator with the provided API.
>
>
>
> Maybe we need a new type of  `XXXStateBootstrapFunction` to change more
> states at one time.
>
>
>
> Best,
>
> Hang
>
>
>
> David Artiga  于2023年3月22日周三 15:22写道:
>
> We are using state
> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
>  processor
> API
> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
>  to
> bootstrap the state of some operators. It has been working fine until now,
> when we tried to bootstrap an operator that has both a keyed 

RE: Bootstrapping multiple state within same operator

2023-03-24 Thread Schwalbe Matthias
Hi David,
… coming in late into this discussion

We had a very similar problem and I found a simple way to implement priming 
savepoints with mixed keyed/operator state.
The trick is this:

  *   In your KeyedStateBootstrapFunction also implement CheckpointedFunction
  *   In initializeState() you can initialize the broad state primitive (the 
code skeleton below uses getUnionListState, same principle)
  *   Then in the processElement() function I process a tuple of state 
collections for each state primitive, i.e. event object per key
  *   For the union list state I forge special key “broadcast”, and only the 
3rd tuple vector contains anything,
 *   (the upstream operator feeding into this bootstrap function makes sure 
only one event with “broadcast” key is generated)

Peruse the code skeleton (scala) if you like (I removed some stuff I’m not 
supposed to disclose):


/** Event type for state updates for savepoint generation. A 4-tuple of
* 1) vector of valid [[CSC]] entries
* 2) a vector of valid [[DAS]] entries
* 3) a vector of valid broadcast FFR.
* 4) a vector of timers for state cleanup */
type DAFFS = (Vector[CSC], Vector[DAS], Vector[FFR], Vector[Long])

/** [[StateUpdate]] type for [[DAFFS]] state along with the [[String]] key 
context. */
type DAFFSHU = StateUpdate[String, DAFFS]

class DAFFOperatorStateBootstrapFunction
  extends KeyedStateBootstrapFunction[String, DAFFSHU]
with CheckpointedFunction {

  override def open(parameters: Configuration): Unit = {
super.open(parameters)
val rtc: RuntimeContext = getRuntimeContext
//keyed state setup:
// cSC = rtc.getListState(new ListStateDescriptor[CSC](...
// dAS = rtc.getListState(new ListStateDescriptor[DAS](...
  }

  override def processElement(value: DAFFSHU, ctx: 
KeyedStateBootstrapFunction[String, DAFFSHU]#Context): Unit = {

val daffs = value.state
val ts = ctx.timerService()

for (csc <- daffs._1) {
  cSC.add(csc)
}
for (das <- daffs._2) {
  dAS.add(das)
}
for (ffr <- daffs._3) {
  fFRState.add(ffr)
}
for (timer <- daffs._4) {
  ts.registerEventTimeTimer(timer)
}

val stop = 0
  }

  @transient var fFRState: ListState[FFR] = null

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
val fFRStateDescriptor = new ListStateDescriptor[FFR]("ffr", ffrTI)
fFRState = 
context.getOperatorStateStore.getUnionListState(fFRStateDescriptor)
  }
}

Hope this helps …

Sincere greetings

Thias


From: David Artiga 
Sent: Wednesday, March 22, 2023 11:31 AM
To: Hang Ruan 
Cc: user@flink.apache.org
Subject: Re: Bootstrapping multiple state within same operator

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Not familiar with the implementation but thinking some options:

- composable transformations
- underlying MultiMap
- ...

On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan 
mailto:ruanhang1...@gmail.com>> wrote:
Hi, David,
I also read the code about the `SavepointWriter#withOperator`. The 
transformations are stored in a `Map` whose key is `OperatorID`. I don't come 
up with a way that we could register multi transformations for one operator 
with the provided API.

Maybe we need a new type of  `XXXStateBootstrapFunction` to change more states 
at one time.

Best,
Hang

David Artiga mailto:david.art...@gmail.com>> 
于2023年3月22日周三 15:22写道:
We are using 
state<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
 processor 
API<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
 to bootstrap the state of some operators. It has been working fine until now, 
when we tried to bootstrap an operator that has both a keyed state and a 
broadcasted state. Seems the API does not provide a convenient method to apply 
multiple transformations on the same uid...

Is there a way to do that and we just missed it? Any insights appreciated.

Cheers,
/David
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and

Re: Bootstrapping multiple state within same operator

2023-03-22 Thread David Artiga
Not familiar with the implementation but thinking some options:

- composable transformations
- underlying MultiMap
- ...

On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan  wrote:

> Hi, David,
> I also read the code about the `SavepointWriter#withOperator`. The
> transformations are stored in a `Map` whose key is `OperatorID`. I don't
> come up with a way that we could register multi transformations for one
> operator with the provided API.
>
> Maybe we need a new type of  `XXXStateBootstrapFunction` to change more
> states at one time.
>
> Best,
> Hang
>
> David Artiga  于2023年3月22日周三 15:22写道:
>
>> We are using state
>> 
>>  processor
>> API
>> 
>>  to
>> bootstrap the state of some operators. It has been working fine until now,
>> when we tried to bootstrap an operator that has both a keyed state and a
>> broadcasted state. Seems the API does not provide a convenient method to
>> apply multiple transformations on the same *uid...*
>>
>> Is there a way to do that and we just missed it? Any insights appreciated.
>>
>> Cheers,
>> /David
>>
>


Re: Bootstrapping multiple state within same operator

2023-03-22 Thread Hang Ruan
Hi, David,
I also read the code about the `SavepointWriter#withOperator`. The
transformations are stored in a `Map` whose key is `OperatorID`. I don't
come up with a way that we could register multi transformations for one
operator with the provided API.

Maybe we need a new type of  `XXXStateBootstrapFunction` to change more
states at one time.

Best,
Hang

David Artiga  于2023年3月22日周三 15:22写道:

> We are using state
> 
>  processor
> API
> 
>  to
> bootstrap the state of some operators. It has been working fine until now,
> when we tried to bootstrap an operator that has both a keyed state and a
> broadcasted state. Seems the API does not provide a convenient method to
> apply multiple transformations on the same *uid...*
>
> Is there a way to do that and we just missed it? Any insights appreciated.
>
> Cheers,
> /David
>


Bootstrapping multiple state within same operator

2023-03-22 Thread David Artiga
We are using state

processor
API

to
bootstrap the state of some operators. It has been working fine until now,
when we tried to bootstrap an operator that has both a keyed state and a
broadcasted state. Seems the API does not provide a convenient method to
apply multiple transformations on the same *uid...*

Is there a way to do that and we just missed it? Any insights appreciated.

Cheers,
/David