Is there a chance, this can be answered?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> Begin forwarded message:
> 
> From: Boris Lublinsky <boris.lublin...@lightbend.com>
> Subject: Re: Custom Kryo serializer
> Date: July 19, 2017 at 8:28:16 AM CDT
> To: user@flink.apache.org, ches...@apache.org
> 
> Thanks for the reply, but I am not using it for managed state, but rather for 
> the raw state
> In my implementation I have the following
> 
> class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, 
> Double]{
> 
>   // The managed keyed state see 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>
>   var modelState: ValueState[ModelToServeStats] = _
>   var newModelState: ValueState[ModelToServeStats] = _
>   // The raw state - 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state>
>   var currentModel : Option[Model] = None
>   var newModel : Option[Model] = None
> 
> Where current and new model are instances of the trait for which I implement 
> serializer
> According to documentation 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state>
> 
> “Raw State is state that operators keep in their own data structures. When 
> checkpointed, they only write a sequence of bytes into the checkpoint. Flink 
> knows nothing about the state’s data structures and sees only the raw bytes.”
> 
> So I was assuming that I need to provide serializer for this.
> Am I missing something?
> 
> 
> 
> 
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
> https://www.lightbend.com/
> 
>> 
>> ---------- Forwarded message ----------
>> From: Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>>
>> Date: Wed, Jul 19, 2017 at 1:34 PM
>> Subject: Re: Custom Kryo serializer
>> To: user@flink.apache.org <mailto:user@flink.apache.org>
>> 
>> 
>> Hello,
>> 
>> I assume you're passing the class of your serializer in a StateDescriptor 
>> constructor.
>> 
>> If so, you could add a breakpoint in 
>> Statedescriptor#initializeSerializerUnlessSet,
>> and check what typeInfo is created and which serializer is created as a 
>> result.
>> 
>> One thing you could try right away is registering your serializer for the 
>> Model implementations,
>> instead of the trait.
>> 
>> Regards,
>> Chesnay
>> 
>> 
>> On 14.07.2017 15:50, Boris Lublinsky wrote:
>>> Hi 
>>> I have several implementations of my Model trait, 
>>> 
>>> trait Model {
>>>   def score(input : AnyVal) : AnyVal
>>>   def cleanup() : Unit
>>>   def toBytes() : Array[Byte]
>>>   def getType : Long
>>> }
>>> 
>>> neither one of them are serializable, but are used in the state definition.
>>> So I implemented custom serializer
>>> 
>>> import com.esotericsoftware.kryo.io 
>>> <http://com.esotericsoftware.kryo.io/>.{Input, Output}
>>> import com.esotericsoftware.kryo.{Kryo, Serializer}
>>> import com.lightbend.model.modeldescriptor.ModelDescriptor
>>> 
>>> 
>>> class ModelSerializerKryo extends Serializer[Model]{
>>>   
>>>   super.setAcceptsNull(false)
>>>   super.setImmutable(true)
>>> 
>>>   /** Reads bytes and returns a new object of the specified concrete type.
>>>     * <p>
>>>     * Before Kryo can be used to read child objects, {@link 
>>> Kryo#reference(Object)} must be called with the parent object to
>>>     * ensure it can be referenced by the child objects. Any serializer that 
>>> uses {@link Kryo} to read a child object may need to
>>>     * be reentrant.
>>>     * <p>
>>>     * This method should not be called directly, instead this serializer 
>>> can be passed to {@link Kryo} read methods that accept a
>>>     * serialier.
>>>     *
>>>     * @return May be null if { @link #getAcceptsNull()} is true. */
>>>   
>>>   override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model 
>>> = {
>>> 
>>>     import ModelSerializerKryo._
>>> 
>>>     val mType = input.readLong().asInstanceOf[Int]
>>>     val bytes = Stream.continually(input.readByte()).takeWhile(_ != 
>>> -1).toArray
>>>     factories.get(mType) match {
>>>       case Some(factory) => factory.restore(bytes)
>>>       case _ => throw new Exception(s"Unknown model type $mType to restore")
>>>     }
>>>   }
>>> 
>>>   /** Writes the bytes for the object to the output.
>>>     * <p>
>>>     * This method should not be called directly, instead this serializer 
>>> can be passed to {@link Kryo} write methods that accept a
>>>     * serialier.
>>>     *
>>>     * @param value May be null if { @link #getAcceptsNull()} is true. */
>>>   
>>>   override def write(kryo: Kryo, output: Output, value: Model): Unit = {
>>>     output.writeLong(value.getType)
>>>     output.write(value.toBytes)
>>>   }
>>> }
>>> 
>>> object ModelSerializerKryo{
>>>   private val factories = Map(ModelDescriptor.ModelType.PMML.value -> 
>>> PMMLModel,
>>>     ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
>>> }
>>> And added the following 
>>> 
>>> // Add custom serializer
>>> env.getConfig.addDefaultKryoSerializer(classOf[Model], 
>>> classOf[ModelSerializerKryo])
>>> 
>>> To configure it.
>>> I can see checkpoint messages at the output console, but I never hist a 
>>> break point in serializer.
>>> Any suggestions?
>>>  
>>> 
>>> 
>>> 
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>> 
>> 
> 
> Begin forwarded message:
> 
> From: Boris Lublinsky <boris.lublin...@lightbend.com>
> Subject: Custom Kryo serializer
> Date: July 14, 2017 at 8:50:22 AM CDT
> To: user@flink.apache.org
> 
> Hi 
> I have several implementations of my Model trait, 
> 
> trait Model {
>   def score(input : AnyVal) : AnyVal
>   def cleanup() : Unit
>   def toBytes() : Array[Byte]
>   def getType : Long
> }
> 
> neither one of them are serializable, but are used in the state definition.
> So I implemented custom serializer
> 
> import com.esotericsoftware.kryo.io 
> <http://com.esotericsoftware.kryo.io/>.{Input, Output}
> import com.esotericsoftware.kryo.{Kryo, Serializer}
> import com.lightbend.model.modeldescriptor.ModelDescriptor
> 
> 
> class ModelSerializerKryo extends Serializer[Model]{
>   
>   super.setAcceptsNull(false)
>   super.setImmutable(true)
> 
>   /** Reads bytes and returns a new object of the specified concrete type.
>     * <p>
>     * Before Kryo can be used to read child objects, {@link 
> Kryo#reference(Object)} must be called with the parent object to
>     * ensure it can be referenced by the child objects. Any serializer that 
> uses {@link Kryo} to read a child object may need to
>     * be reentrant.
>     * <p>
>     * This method should not be called directly, instead this serializer can 
> be passed to {@link Kryo} read methods that accept a
>     * serialier.
>     *
>     * @return May be null if { @link #getAcceptsNull()} is true. */
>   
>   override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = {
> 
>     import ModelSerializerKryo._
> 
>     val mType = input.readLong().asInstanceOf[Int]
>     val bytes = Stream.continually(input.readByte()).takeWhile(_ != 
> -1).toArray
>     factories.get(mType) match {
>       case Some(factory) => factory.restore(bytes)
>       case _ => throw new Exception(s"Unknown model type $mType to restore")
>     }
>   }
> 
>   /** Writes the bytes for the object to the output.
>     * <p>
>     * This method should not be called directly, instead this serializer can 
> be passed to {@link Kryo} write methods that accept a
>     * serialier.
>     *
>     * @param value May be null if { @link #getAcceptsNull()} is true. */
>   
>   override def write(kryo: Kryo, output: Output, value: Model): Unit = {
>     output.writeLong(value.getType)
>     output.write(value.toBytes)
>   }
> }
> 
> object ModelSerializerKryo{
>   private val factories = Map(ModelDescriptor.ModelType.PMML.value -> 
> PMMLModel,
>     ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
> }
> And added the following 
> 
> // Add custom serializer
> env.getConfig.addDefaultKryoSerializer(classOf[Model], 
> classOf[ModelSerializerKryo])
> 
> To configure it.
> I can see checkpoint messages at the output console, but I never hist a break 
> point in serializer.
> Any suggestions?
>  
> 
> 
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
> https://www.lightbend.com/
> 

Reply via email to