Thanks Chesney, Can you, please, point me to any example? Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/
> On Jul 24, 2017, at 9:27 AM, Chesnay Schepler <ches...@apache.org> wrote: > > Copy of a mail i sent to the user mailing list only: > > Raw state can only be used when implementing an operator, not a function. > > For functions you have to use Managed Operator State. Your function will have > to implement > the CheckpointedFunction interface, and create a ValueStateDescriptor that > you register in initializeState. > > On 24.07.2017 16:26, Boris Lublinsky wrote: >> Is there a chance, this can be answered? >> >> Boris Lublinsky >> FDP Architect >> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >> https://www.lightbend.com/ <https://www.lightbend.com/> >> >>> Begin forwarded message: >>> >>> From: Boris Lublinsky <boris.lublin...@lightbend.com >>> <mailto:boris.lublin...@lightbend.com>> >>> Subject: Re: Custom Kryo serializer >>> Date: July 19, 2017 at 8:28:16 AM CDT >>> To: user@flink.apache.org <mailto:user@flink.apache.org>, >>> ches...@apache.org <mailto:ches...@apache.org> >>> >>> Thanks for the reply, but I am not using it for managed state, but rather >>> for the raw state >>> In my implementation I have the following >>> >>> class DataProcessorKeyed extends CoProcessFunction[WineRecord, >>> ModelToServe, Double]{ >>> >>> // The managed keyed state see >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html >>> >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html> >>> var modelState: ValueState[ModelToServeStats] = _ >>> var newModelState: ValueState[ModelToServeStats] = _ >>> // The raw state - >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state >>> >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state> >>> var currentModel : Option[Model] = None >>> var newModel : Option[Model] = None >>> >>> Where current and new model are instances of the trait for which I >>> implement serializer >>> According to documentation >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state >>> >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state> >>> >>> “Raw State is state that operators keep in their own data structures. When >>> checkpointed, they only write a sequence of bytes into the checkpoint. >>> Flink knows nothing about the state’s data structures and sees only the raw >>> bytes.” >>> >>> So I was assuming that I need to provide serializer for this. >>> Am I missing something? >>> >>> >>> >>> >>> >>> Boris Lublinsky >>> FDP Architect >>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >>> https://www.lightbend.com/ <https://www.lightbend.com/> >>>> >>>> ---------- Forwarded message ---------- >>>> From: Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> >>>> Date: Wed, Jul 19, 2017 at 1:34 PM >>>> Subject: Re: Custom Kryo serializer >>>> To: user@flink.apache.org <mailto:user@flink.apache.org> >>>> >>>> >>>> Hello, >>>> >>>> I assume you're passing the class of your serializer in a StateDescriptor >>>> constructor. >>>> >>>> If so, you could add a breakpoint in >>>> Statedescriptor#initializeSerializerUnlessSet, >>>> and check what typeInfo is created and which serializer is created as a >>>> result. >>>> >>>> One thing you could try right away is registering your serializer for the >>>> Model implementations, >>>> instead of the trait. >>>> >>>> Regards, >>>> Chesnay >>>> >>>> >>>> On 14.07.2017 15:50, Boris Lublinsky wrote: >>>>> Hi >>>>> I have several implementations of my Model trait, >>>>> >>>>> trait Model { >>>>> def score(input : AnyVal) : AnyVal >>>>> def cleanup() : Unit >>>>> def toBytes() : Array[Byte] >>>>> def getType : Long >>>>> } >>>>> >>>>> neither one of them are serializable, but are used in the state >>>>> definition. >>>>> So I implemented custom serializer >>>>> >>>>> import com.esotericsoftware.kryo.io >>>>> <http://com.esotericsoftware.kryo.io/>.{Input, Output} >>>>> import com.esotericsoftware.kryo.{Kryo, Serializer} >>>>> import com.lightbend.model.modeldescriptor.ModelDescriptor >>>>> >>>>> >>>>> class ModelSerializerKryo extends Serializer[Model]{ >>>>> >>>>> super.setAcceptsNull(false) >>>>> super.setImmutable(true) >>>>> >>>>> /** Reads bytes and returns a new object of the specified concrete type. >>>>> * <p> >>>>> * Before Kryo can be used to read child objects, {@link >>>>> Kryo#reference(Object)} must be called with the parent object to >>>>> * ensure it can be referenced by the child objects. Any serializer >>>>> that uses {@link Kryo} to read a child object may need to >>>>> * be reentrant. >>>>> * <p> >>>>> * This method should not be called directly, instead this serializer >>>>> can be passed to {@link Kryo} read methods that accept a >>>>> * serialier. >>>>> * >>>>> * @return May be null if { @link #getAcceptsNull()} is true. */ >>>>> >>>>> override def read(kryo: Kryo, input: Input, `type`: Class[Model]): >>>>> Model = { >>>>> >>>>> import ModelSerializerKryo._ >>>>> >>>>> val mType = input.readLong().asInstanceOf[Int] >>>>> val bytes = Stream.continually(input.readByte()).takeWhile(_ != >>>>> -1).toArray >>>>> factories.get(mType) match { >>>>> case Some(factory) => factory.restore(bytes) >>>>> case _ => throw new Exception(s"Unknown model type $mType to >>>>> restore") >>>>> } >>>>> } >>>>> >>>>> /** Writes the bytes for the object to the output. >>>>> * <p> >>>>> * This method should not be called directly, instead this serializer >>>>> can be passed to {@link Kryo} write methods that accept a >>>>> * serialier. >>>>> * >>>>> * @param value May be null if { @link #getAcceptsNull()} is true. */ >>>>> >>>>> override def write(kryo: Kryo, output: Output, value: Model): Unit = { >>>>> output.writeLong(value.getType) >>>>> output.write(value.toBytes) >>>>> } >>>>> } >>>>> >>>>> object ModelSerializerKryo{ >>>>> private val factories = Map(ModelDescriptor.ModelType.PMML.value -> >>>>> PMMLModel, >>>>> ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel) >>>>> } >>>>> And added the following >>>>> >>>>> // Add custom serializer >>>>> env.getConfig.addDefaultKryoSerializer(classOf[Model], >>>>> classOf[ModelSerializerKryo]) >>>>> >>>>> To configure it. >>>>> I can see checkpoint messages at the output console, but I never hist a >>>>> break point in serializer. >>>>> Any suggestions? >>>>> >>>>> >>>>> >>>>> >>>>> Boris Lublinsky >>>>> FDP Architect >>>>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >>>>> https://www.lightbend.com/ <https://www.lightbend.com/> >>>> >>>> >>> >>> Begin forwarded message: >>> >>> From: Boris Lublinsky <boris.lublin...@lightbend.com >>> <mailto:boris.lublin...@lightbend.com>> >>> Subject: Custom Kryo serializer >>> Date: July 14, 2017 at 8:50:22 AM CDT >>> To: user@flink.apache.org <mailto:user@flink.apache.org> >>> >>> Hi >>> I have several implementations of my Model trait, >>> >>> trait Model { >>> def score(input : AnyVal) : AnyVal >>> def cleanup() : Unit >>> def toBytes() : Array[Byte] >>> def getType : Long >>> } >>> >>> neither one of them are serializable, but are used in the state definition. >>> So I implemented custom serializer >>> >>> import com.esotericsoftware.kryo.io >>> <http://com.esotericsoftware.kryo.io/>.{Input, Output} >>> import com.esotericsoftware.kryo.{Kryo, Serializer} >>> import com.lightbend.model.modeldescriptor.ModelDescriptor >>> >>> >>> class ModelSerializerKryo extends Serializer[Model]{ >>> >>> super.setAcceptsNull(false) >>> super.setImmutable(true) >>> >>> /** Reads bytes and returns a new object of the specified concrete type. >>> * <p> >>> * Before Kryo can be used to read child objects, {@link >>> Kryo#reference(Object)} must be called with the parent object to >>> * ensure it can be referenced by the child objects. Any serializer that >>> uses {@link Kryo} to read a child object may need to >>> * be reentrant. >>> * <p> >>> * This method should not be called directly, instead this serializer >>> can be passed to {@link Kryo} read methods that accept a >>> * serialier. >>> * >>> * @return May be null if { @link #getAcceptsNull()} is true. */ >>> >>> override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model >>> = { >>> >>> import ModelSerializerKryo._ >>> >>> val mType = input.readLong().asInstanceOf[Int] >>> val bytes = Stream.continually(input.readByte()).takeWhile(_ != >>> -1).toArray >>> factories.get(mType) match { >>> case Some(factory) => factory.restore(bytes) >>> case _ => throw new Exception(s"Unknown model type $mType to restore") >>> } >>> } >>> >>> /** Writes the bytes for the object to the output. >>> * <p> >>> * This method should not be called directly, instead this serializer >>> can be passed to {@link Kryo} write methods that accept a >>> * serialier. >>> * >>> * @param value May be null if { @link #getAcceptsNull()} is true. */ >>> >>> override def write(kryo: Kryo, output: Output, value: Model): Unit = { >>> output.writeLong(value.getType) >>> output.write(value.toBytes) >>> } >>> } >>> >>> object ModelSerializerKryo{ >>> private val factories = Map(ModelDescriptor.ModelType.PMML.value -> >>> PMMLModel, >>> ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel) >>> } >>> And added the following >>> >>> // Add custom serializer >>> env.getConfig.addDefaultKryoSerializer(classOf[Model], >>> classOf[ModelSerializerKryo]) >>> >>> To configure it. >>> I can see checkpoint messages at the output console, but I never hist a >>> break point in serializer. >>> Any suggestions? >>> >>> >>> >>> >>> Boris Lublinsky >>> FDP Architect >>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >>> https://www.lightbend.com/ <https://www.lightbend.com/> >> >