Thanks Chesney,
Can you, please, point me to any example? 

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Jul 24, 2017, at 9:27 AM, Chesnay Schepler <ches...@apache.org> wrote:
> 
> Copy of a mail i sent to the user mailing list only:
> 
> Raw state can only be used when implementing an operator, not a function.
> 
> For functions you have to use Managed Operator State. Your function will have 
> to implement
> the CheckpointedFunction interface, and create a ValueStateDescriptor that 
> you register in initializeState.
> 
> On 24.07.2017 16:26, Boris Lublinsky wrote:
>> Is there a chance, this can be answered?
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
>> 
>>> Begin forwarded message:
>>> 
>>> From: Boris Lublinsky <boris.lublin...@lightbend.com 
>>> <mailto:boris.lublin...@lightbend.com>>
>>> Subject: Re: Custom Kryo serializer
>>> Date: July 19, 2017 at 8:28:16 AM CDT
>>> To: user@flink.apache.org <mailto:user@flink.apache.org>, 
>>> ches...@apache.org <mailto:ches...@apache.org>
>>> 
>>> Thanks for the reply, but I am not using it for managed state, but rather 
>>> for the raw state
>>> In my implementation I have the following
>>> 
>>> class DataProcessorKeyed extends CoProcessFunction[WineRecord, 
>>> ModelToServe, Double]{
>>> 
>>>   // The managed keyed state see 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>
>>>   var modelState: ValueState[ModelToServeStats] = _
>>>   var newModelState: ValueState[ModelToServeStats] = _
>>>   // The raw state - 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state>
>>>   var currentModel : Option[Model] = None
>>>   var newModel : Option[Model] = None
>>> 
>>> Where current and new model are instances of the trait for which I 
>>> implement serializer
>>> According to documentation 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state>
>>> 
>>> “Raw State is state that operators keep in their own data structures. When 
>>> checkpointed, they only write a sequence of bytes into the checkpoint. 
>>> Flink knows nothing about the state’s data structures and sees only the raw 
>>> bytes.”
>>> 
>>> So I was assuming that I need to provide serializer for this.
>>> Am I missing something?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>>> 
>>>> ---------- Forwarded message ----------
>>>> From: Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>>
>>>> Date: Wed, Jul 19, 2017 at 1:34 PM
>>>> Subject: Re: Custom Kryo serializer
>>>> To: user@flink.apache.org <mailto:user@flink.apache.org>
>>>> 
>>>> 
>>>> Hello,
>>>> 
>>>> I assume you're passing the class of your serializer in a StateDescriptor 
>>>> constructor.
>>>> 
>>>> If so, you could add a breakpoint in 
>>>> Statedescriptor#initializeSerializerUnlessSet,
>>>> and check what typeInfo is created and which serializer is created as a 
>>>> result.
>>>> 
>>>> One thing you could try right away is registering your serializer for the 
>>>> Model implementations,
>>>> instead of the trait.
>>>> 
>>>> Regards,
>>>> Chesnay
>>>> 
>>>> 
>>>> On 14.07.2017 15:50, Boris Lublinsky wrote:
>>>>> Hi 
>>>>> I have several implementations of my Model trait, 
>>>>> 
>>>>> trait Model {
>>>>>   def score(input : AnyVal) : AnyVal
>>>>>   def cleanup() : Unit
>>>>>   def toBytes() : Array[Byte]
>>>>>   def getType : Long
>>>>> }
>>>>> 
>>>>> neither one of them are serializable, but are used in the state 
>>>>> definition.
>>>>> So I implemented custom serializer
>>>>> 
>>>>> import com.esotericsoftware.kryo.io 
>>>>> <http://com.esotericsoftware.kryo.io/>.{Input, Output}
>>>>> import com.esotericsoftware.kryo.{Kryo, Serializer}
>>>>> import com.lightbend.model.modeldescriptor.ModelDescriptor
>>>>> 
>>>>> 
>>>>> class ModelSerializerKryo extends Serializer[Model]{
>>>>>   
>>>>>   super.setAcceptsNull(false)
>>>>>   super.setImmutable(true)
>>>>> 
>>>>>   /** Reads bytes and returns a new object of the specified concrete type.
>>>>>     * <p>
>>>>>     * Before Kryo can be used to read child objects, {@link 
>>>>> Kryo#reference(Object)} must be called with the parent object to
>>>>>     * ensure it can be referenced by the child objects. Any serializer 
>>>>> that uses {@link Kryo} to read a child object may need to
>>>>>     * be reentrant.
>>>>>     * <p>
>>>>>     * This method should not be called directly, instead this serializer 
>>>>> can be passed to {@link Kryo} read methods that accept a
>>>>>     * serialier.
>>>>>     *
>>>>>     * @return May be null if { @link #getAcceptsNull()} is true. */
>>>>>   
>>>>>   override def read(kryo: Kryo, input: Input, `type`: Class[Model]): 
>>>>> Model = {
>>>>> 
>>>>>     import ModelSerializerKryo._
>>>>> 
>>>>>     val mType = input.readLong().asInstanceOf[Int]
>>>>>     val bytes = Stream.continually(input.readByte()).takeWhile(_ != 
>>>>> -1).toArray
>>>>>     factories.get(mType) match {
>>>>>       case Some(factory) => factory.restore(bytes)
>>>>>       case _ => throw new Exception(s"Unknown model type $mType to 
>>>>> restore")
>>>>>     }
>>>>>   }
>>>>> 
>>>>>   /** Writes the bytes for the object to the output.
>>>>>     * <p>
>>>>>     * This method should not be called directly, instead this serializer 
>>>>> can be passed to {@link Kryo} write methods that accept a
>>>>>     * serialier.
>>>>>     *
>>>>>     * @param value May be null if { @link #getAcceptsNull()} is true. */
>>>>>   
>>>>>   override def write(kryo: Kryo, output: Output, value: Model): Unit = {
>>>>>     output.writeLong(value.getType)
>>>>>     output.write(value.toBytes)
>>>>>   }
>>>>> }
>>>>> 
>>>>> object ModelSerializerKryo{
>>>>>   private val factories = Map(ModelDescriptor.ModelType.PMML.value -> 
>>>>> PMMLModel,
>>>>>     ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
>>>>> }
>>>>> And added the following 
>>>>> 
>>>>> // Add custom serializer
>>>>> env.getConfig.addDefaultKryoSerializer(classOf[Model], 
>>>>> classOf[ModelSerializerKryo])
>>>>> 
>>>>> To configure it.
>>>>> I can see checkpoint messages at the output console, but I never hist a 
>>>>> break point in serializer.
>>>>> Any suggestions?
>>>>>  
>>>>> 
>>>>> 
>>>>> 
>>>>> Boris Lublinsky
>>>>> FDP Architect
>>>>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
>>>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>>> 
>>>> 
>>> 
>>> Begin forwarded message:
>>> 
>>> From: Boris Lublinsky <boris.lublin...@lightbend.com 
>>> <mailto:boris.lublin...@lightbend.com>>
>>> Subject: Custom Kryo serializer
>>> Date: July 14, 2017 at 8:50:22 AM CDT
>>> To: user@flink.apache.org <mailto:user@flink.apache.org>
>>> 
>>> Hi 
>>> I have several implementations of my Model trait, 
>>> 
>>> trait Model {
>>>   def score(input : AnyVal) : AnyVal
>>>   def cleanup() : Unit
>>>   def toBytes() : Array[Byte]
>>>   def getType : Long
>>> }
>>> 
>>> neither one of them are serializable, but are used in the state definition.
>>> So I implemented custom serializer
>>> 
>>> import com.esotericsoftware.kryo.io 
>>> <http://com.esotericsoftware.kryo.io/>.{Input, Output}
>>> import com.esotericsoftware.kryo.{Kryo, Serializer}
>>> import com.lightbend.model.modeldescriptor.ModelDescriptor
>>> 
>>> 
>>> class ModelSerializerKryo extends Serializer[Model]{
>>>   
>>>   super.setAcceptsNull(false)
>>>   super.setImmutable(true)
>>> 
>>>   /** Reads bytes and returns a new object of the specified concrete type.
>>>     * <p>
>>>     * Before Kryo can be used to read child objects, {@link 
>>> Kryo#reference(Object)} must be called with the parent object to
>>>     * ensure it can be referenced by the child objects. Any serializer that 
>>> uses {@link Kryo} to read a child object may need to
>>>     * be reentrant.
>>>     * <p>
>>>     * This method should not be called directly, instead this serializer 
>>> can be passed to {@link Kryo} read methods that accept a
>>>     * serialier.
>>>     *
>>>     * @return May be null if { @link #getAcceptsNull()} is true. */
>>>   
>>>   override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model 
>>> = {
>>> 
>>>     import ModelSerializerKryo._
>>> 
>>>     val mType = input.readLong().asInstanceOf[Int]
>>>     val bytes = Stream.continually(input.readByte()).takeWhile(_ != 
>>> -1).toArray
>>>     factories.get(mType) match {
>>>       case Some(factory) => factory.restore(bytes)
>>>       case _ => throw new Exception(s"Unknown model type $mType to restore")
>>>     }
>>>   }
>>> 
>>>   /** Writes the bytes for the object to the output.
>>>     * <p>
>>>     * This method should not be called directly, instead this serializer 
>>> can be passed to {@link Kryo} write methods that accept a
>>>     * serialier.
>>>     *
>>>     * @param value May be null if { @link #getAcceptsNull()} is true. */
>>>   
>>>   override def write(kryo: Kryo, output: Output, value: Model): Unit = {
>>>     output.writeLong(value.getType)
>>>     output.write(value.toBytes)
>>>   }
>>> }
>>> 
>>> object ModelSerializerKryo{
>>>   private val factories = Map(ModelDescriptor.ModelType.PMML.value -> 
>>> PMMLModel,
>>>     ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
>>> }
>>> And added the following 
>>> 
>>> // Add custom serializer
>>> env.getConfig.addDefaultKryoSerializer(classOf[Model], 
>>> classOf[ModelSerializerKryo])
>>> 
>>> To configure it.
>>> I can see checkpoint messages at the output console, but I never hist a 
>>> break point in serializer.
>>> Any suggestions?
>>>  
>>> 
>>> 
>>> 
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>> 
> 

Reply via email to