Copy of a mail i sent to the user mailing list only:

Raw state can only be used when implementing an operator, not a function.

For functions you have to use Managed Operator State. Your function will have to implement the CheckpointedFunction interface, and create a ValueStateDescriptor that you register in initializeState.

On 24.07.2017 16:26, Boris Lublinsky wrote:
Is there a chance, this can be answered?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/

Begin forwarded message:

*From: *Boris Lublinsky <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>>
*Subject: **Re: Custom Kryo serializer*
*Date: *July 19, 2017 at 8:28:16 AM CDT
*To: *user@flink.apache.org <mailto:user@flink.apache.org>, ches...@apache.org <mailto:ches...@apache.org>

Thanks for the reply, but I am not using it for managed state, but rather for the raw state
In my implementation I have the following

class DataProcessorKeyedextends CoProcessFunction[WineRecord, ModelToServe, 
Double]{

// The managed keyed state see https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html var modelState: ValueState[ModelToServeStats] = _
   var newModelState: ValueState[ModelToServeStats] = _
// The raw state - https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state var currentModel : Option[Model] =None
   var newModel : Option[Model] =None

Where current and new model are instances of the trait for which I implement serializer According to documentation https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state

“/Raw State/ is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into the checkpoint. Flink knows nothing about the state’s data structures and sees only the raw bytes.”

So I was assuming that I need to provide serializer for this.
Am I missing something?





Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/


---------- Forwarded message ----------
From: *Chesnay Schepler* <ches...@apache.org <mailto:ches...@apache.org>>
Date: Wed, Jul 19, 2017 at 1:34 PM
Subject: Re: Custom Kryo serializer
To: user@flink.apache.org <mailto:user@flink.apache.org>


Hello,

I assume you're passing the class of your serializer in a StateDescriptor constructor.

If so, you could add a breakpoint in Statedescriptor#initializeSerializerUnlessSet, and check what typeInfo is created and which serializer is created as a result.

One thing you could try right away is registering your serializer for the Model implementations,
instead of the trait.

Regards,
Chesnay


On 14.07.2017 15:50, Boris Lublinsky wrote:
Hi
I have several implementations of my Model trait,

trait Model {
   def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() : 
Array[Byte]
   def getType :Long }

neither one of them are serializable, but are used in the state definition.
So I implemented custom serializer

import com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io/>.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor


class ModelSerializerKryoextends Serializer[Model]{
super.setAcceptsNull(false)
   super.setImmutable(true)

/** Reads bytes and returns a new object of the specified concrete type. * <p> * Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to * ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to * be reentrant. * <p> * This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a * serialier. * * @return May be null if { @link #getAcceptsNull()} is true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]): Model = {

     import ModelSerializerKryo._

     val mType = input.readLong().asInstanceOf[Int]
     val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
     factories.get(mType)match {
       case Some(factory) => factory.restore(bytes)
       case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
     }
   }

/** Writes the bytes for the object to the output. * <p> * This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a * serialier. * * @param value May be null if { @link #getAcceptsNull()} is true. */ override def write(kryo: Kryo, output: Output, value: Model):Unit = {
     output.writeLong(value.getType)
     output.write(value.toBytes)
   }
}

object ModelSerializerKryo{
   private val factories =Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel, 
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following

// Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model], 
classOf[ModelSerializerKryo])

To configure it.
I can see checkpoint messages at the output console, but I never hist a break point in serializer.
Any suggestions?



Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/




Begin forwarded message:

*From: *Boris Lublinsky <boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>>
*Subject: **Custom Kryo serializer*
*Date: *July 14, 2017 at 8:50:22 AM CDT
*To: *user@flink.apache.org <mailto:user@flink.apache.org>

Hi
I have several implementations of my Model trait,

trait Model {
   def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() : 
Array[Byte]
   def getType :Long }

neither one of them are serializable, but are used in the state definition.
So I implemented custom serializer

import com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io/>.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor


class ModelSerializerKryoextends Serializer[Model]{
super.setAcceptsNull(false)
   super.setImmutable(true)

/** Reads bytes and returns a new object of the specified concrete type. * <p> * Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to * ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to * be reentrant. * <p> * This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a * serialier. * * @return May be null if { @link #getAcceptsNull()} is true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]): Model = {

     import ModelSerializerKryo._

     val mType = input.readLong().asInstanceOf[Int]
     val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
     factories.get(mType)match {
       case Some(factory) => factory.restore(bytes)
       case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
     }
   }

/** Writes the bytes for the object to the output. * <p> * This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a * serialier. * * @param value May be null if { @link #getAcceptsNull()} is true. */ override def write(kryo: Kryo, output: Output, value: Model):Unit = {
     output.writeLong(value.getType)
     output.write(value.toBytes)
   }
}

object ModelSerializerKryo{
   private val factories =Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel, 
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following

// Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model], 
classOf[ModelSerializerKryo])

To configure it.
I can see checkpoint messages at the output console, but I never hist a break point in serializer.
Any suggestions?



Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/



Reply via email to