Thanks for the reply, but I am not using it for managed state, but rather for 
the raw state
In my implementation I have the following

class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, 
Double]{

  // The managed keyed state see 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
  var modelState: ValueState[ModelToServeStats] = _
  var newModelState: ValueState[ModelToServeStats] = _
  // The raw state - 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
  var currentModel : Option[Model] = None
  var newModel : Option[Model] = None

Where current and new model are instances of the trait for which I implement 
serializer
According to documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state>

“Raw State is state that operators keep in their own data structures. When 
checkpointed, they only write a sequence of bytes into the checkpoint. Flink 
knows nothing about the state’s data structures and sees only the raw bytes.”

So I was assuming that I need to provide serializer for this.
Am I missing something?





Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> 
> ---------- Forwarded message ----------
> From: Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>>
> Date: Wed, Jul 19, 2017 at 1:34 PM
> Subject: Re: Custom Kryo serializer
> To: user@flink.apache.org <mailto:user@flink.apache.org>
> 
> 
> Hello,
> 
> I assume you're passing the class of your serializer in a StateDescriptor 
> constructor.
> 
> If so, you could add a breakpoint in 
> Statedescriptor#initializeSerializerUnlessSet,
> and check what typeInfo is created and which serializer is created as a 
> result.
> 
> One thing you could try right away is registering your serializer for the 
> Model implementations,
> instead of the trait.
> 
> Regards,
> Chesnay
> 
> 
> On 14.07.2017 15:50, Boris Lublinsky wrote:
>> Hi 
>> I have several implementations of my Model trait, 
>> 
>> trait Model {
>>   def score(input : AnyVal) : AnyVal
>>   def cleanup() : Unit
>>   def toBytes() : Array[Byte]
>>   def getType : Long
>> }
>> 
>> neither one of them are serializable, but are used in the state definition.
>> So I implemented custom serializer
>> 
>> import com.esotericsoftware.kryo.io 
>> <http://com.esotericsoftware.kryo.io/>.{Input, Output}
>> import com.esotericsoftware.kryo.{Kryo, Serializer}
>> import com.lightbend.model.modeldescriptor.ModelDescriptor
>> 
>> 
>> class ModelSerializerKryo extends Serializer[Model]{
>>   
>>   super.setAcceptsNull(false)
>>   super.setImmutable(true)
>> 
>>   /** Reads bytes and returns a new object of the specified concrete type.
>>     * <p>
>>     * Before Kryo can be used to read child objects, {@link 
>> Kryo#reference(Object)} must be called with the parent object to
>>     * ensure it can be referenced by the child objects. Any serializer that 
>> uses {@link Kryo} to read a child object may need to
>>     * be reentrant.
>>     * <p>
>>     * This method should not be called directly, instead this serializer can 
>> be passed to {@link Kryo} read methods that accept a
>>     * serialier.
>>     *
>>     * @return May be null if { @link #getAcceptsNull()} is true. */
>>   
>>   override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = 
>> {
>> 
>>     import ModelSerializerKryo._
>> 
>>     val mType = input.readLong().asInstanceOf[Int]
>>     val bytes = Stream.continually(input.readByte()).takeWhile(_ != 
>> -1).toArray
>>     factories.get(mType) match {
>>       case Some(factory) => factory.restore(bytes)
>>       case _ => throw new Exception(s"Unknown model type $mType to restore")
>>     }
>>   }
>> 
>>   /** Writes the bytes for the object to the output.
>>     * <p>
>>     * This method should not be called directly, instead this serializer can 
>> be passed to {@link Kryo} write methods that accept a
>>     * serialier.
>>     *
>>     * @param value May be null if { @link #getAcceptsNull()} is true. */
>>   
>>   override def write(kryo: Kryo, output: Output, value: Model): Unit = {
>>     output.writeLong(value.getType)
>>     output.write(value.toBytes)
>>   }
>> }
>> 
>> object ModelSerializerKryo{
>>   private val factories = Map(ModelDescriptor.ModelType.PMML.value -> 
>> PMMLModel,
>>     ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
>> }
>> And added the following 
>> 
>> // Add custom serializer
>> env.getConfig.addDefaultKryoSerializer(classOf[Model], 
>> classOf[ModelSerializerKryo])
>> 
>> To configure it.
>> I can see checkpoint messages at the output console, but I never hist a 
>> break point in serializer.
>> Any suggestions?
>>  
>> 
>> 
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
> 
> 

Reply via email to