Hi all,

When trying to adopt the new (@Experimental) KeyedMultipleInputTransformation I 
came across following problem:


  *   In the open(…) function of my operator, derived from 
MultipleInputStreamOperator with AbstractStreamOperatorV2, I can not initialize 
keyed state primitives, because
  *   StreamingRuntimeContext#keyedStateStore is not properly initialized
  *   I’ve tracked down the root cause to a difference in the implementation of 
initializeState(…) in
     *   AbstractStreamOperator [1] vs.
     *   AbstractStreamOperatorV2 [2]
     *   The latter is missing a call to: 
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));

Is this an oversight (bug) or is this intentional?

… However I’ve found a work-around for until this is fixed:

  *   Implement initializeState(context: StateInitializationContext) in the 
operator and initialize the runtime-context properly (scala):

override def initializeState(context: StateInitializationContext): Unit = {
    super.initializeState(context)
    //TODO: remove once flink implementation is fixed
    val rtc = getRuntimeContext
    val kss = context.getKeyedStateStore
    rtc.setKeyedStateStore(kss)
}


I can create a Bug ticket, if confirmed as bug 😊

Many thanks

Thias



[1] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L287
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java#L231
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to