In a KeyedCoProcessFunction, I am managing a keyed state which consists of
third-party library models. These models are created on reception of new
data on the control stream within `processElement1`. Because the models are
self-evolving, in the sense that have their own internal state, I need to
make sure that they are serialized in `modelsBytes` when their state
changes. My first attempt goes like this:


```scala
class MyOperator
  extends KeyedCoProcessFunction[String, Control, Data, Prediction]
    with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[String, Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[String, Array[Bytes]] = _

  override def processElement1(control, ctx, ...) {
    if (restoreModels) {
      restoreModels()
    }
    
    // - Create new model out of `control` element
    // - Add it to `models` keyed state
  }

  override def processElement2(data, ctx, ...) {
    if (restoreModels) {
      restoreModels()
    }

    // - Send `data` element to the corresponding models
    // This will update their internal states
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    // Suspicious, wishful-thinking code that compiles and runs just fine
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit
= {
    modelsBytes = context.getKeyedStateStore.getMapState[String](
      new MapStateDescriptor("modelsBytes", classOf[String])
    )

    if (context.isRestored) restoreModels = true
  }

}
```

So, the idea is to use `snapshotState` to override the *keyed* state entries
in `modelsBytes`. The reason why I am trying this approach is because
serializing the models (`model.toBytes`) might be an expensive operation.
Therefore, I would prefer to do it once per model when a checkpoint comes.
The problem with this approach is that it might be inherently/conceptually
wrong. Here is why, even if the code within `snapshotState` compiles and
runs just fine, note that I am referring to a keyed state piece without
getting a `keyed` context passed in, so it is not clear at all what key I am
really working on to start with. I have written a small test to verify the
checkpoints, and I have observed that from time to time I get an empty state
back, even if the modelsBytes state entries were updated in `snapshotState`.
So it seems that snapshotting my models like this is not reliable at all.
What confuses me is that the user is perfectly allowed to do this, maybe the
`put` method should raise an exception to make it clear that a keyed state
is required in the first place, otherwise it gives false hope and might lead
to hard-to-spot bugs. As a matter of fact, shouldn't this be considered a
bug?

The other option I have is, of course, to serialize my models in
`processElement2`, after sending new data elements to them. However,
continuously serialzing my models to update `modelsBytes` might be costly.

What would be the most efficient way to handle this scenario?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to