Is there a chance, this can be answered? Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/
> Begin forwarded message: > > From: Boris Lublinsky <boris.lublin...@lightbend.com> > Subject: Re: Custom Kryo serializer > Date: July 19, 2017 at 8:28:16 AM CDT > To: user@flink.apache.org, ches...@apache.org > > Thanks for the reply, but I am not using it for managed state, but rather for > the raw state > In my implementation I have the following > > class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, > Double]{ > > // The managed keyed state see > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html> > var modelState: ValueState[ModelToServeStats] = _ > var newModelState: ValueState[ModelToServeStats] = _ > // The raw state - > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state > > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state> > var currentModel : Option[Model] = None > var newModel : Option[Model] = None > > Where current and new model are instances of the trait for which I implement > serializer > According to documentation > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state > > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state> > > “Raw State is state that operators keep in their own data structures. When > checkpointed, they only write a sequence of bytes into the checkpoint. Flink > knows nothing about the state’s data structures and sees only the raw bytes.” > > So I was assuming that I need to provide serializer for this. > Am I missing something? > > > > > > Boris Lublinsky > FDP Architect > boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> > https://www.lightbend.com/ > >> >> ---------- Forwarded message ---------- >> From: Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> >> Date: Wed, Jul 19, 2017 at 1:34 PM >> Subject: Re: Custom Kryo serializer >> To: user@flink.apache.org <mailto:user@flink.apache.org> >> >> >> Hello, >> >> I assume you're passing the class of your serializer in a StateDescriptor >> constructor. >> >> If so, you could add a breakpoint in >> Statedescriptor#initializeSerializerUnlessSet, >> and check what typeInfo is created and which serializer is created as a >> result. >> >> One thing you could try right away is registering your serializer for the >> Model implementations, >> instead of the trait. >> >> Regards, >> Chesnay >> >> >> On 14.07.2017 15:50, Boris Lublinsky wrote: >>> Hi >>> I have several implementations of my Model trait, >>> >>> trait Model { >>> def score(input : AnyVal) : AnyVal >>> def cleanup() : Unit >>> def toBytes() : Array[Byte] >>> def getType : Long >>> } >>> >>> neither one of them are serializable, but are used in the state definition. >>> So I implemented custom serializer >>> >>> import com.esotericsoftware.kryo.io >>> <http://com.esotericsoftware.kryo.io/>.{Input, Output} >>> import com.esotericsoftware.kryo.{Kryo, Serializer} >>> import com.lightbend.model.modeldescriptor.ModelDescriptor >>> >>> >>> class ModelSerializerKryo extends Serializer[Model]{ >>> >>> super.setAcceptsNull(false) >>> super.setImmutable(true) >>> >>> /** Reads bytes and returns a new object of the specified concrete type. >>> * <p> >>> * Before Kryo can be used to read child objects, {@link >>> Kryo#reference(Object)} must be called with the parent object to >>> * ensure it can be referenced by the child objects. Any serializer that >>> uses {@link Kryo} to read a child object may need to >>> * be reentrant. >>> * <p> >>> * This method should not be called directly, instead this serializer >>> can be passed to {@link Kryo} read methods that accept a >>> * serialier. >>> * >>> * @return May be null if { @link #getAcceptsNull()} is true. */ >>> >>> override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model >>> = { >>> >>> import ModelSerializerKryo._ >>> >>> val mType = input.readLong().asInstanceOf[Int] >>> val bytes = Stream.continually(input.readByte()).takeWhile(_ != >>> -1).toArray >>> factories.get(mType) match { >>> case Some(factory) => factory.restore(bytes) >>> case _ => throw new Exception(s"Unknown model type $mType to restore") >>> } >>> } >>> >>> /** Writes the bytes for the object to the output. >>> * <p> >>> * This method should not be called directly, instead this serializer >>> can be passed to {@link Kryo} write methods that accept a >>> * serialier. >>> * >>> * @param value May be null if { @link #getAcceptsNull()} is true. */ >>> >>> override def write(kryo: Kryo, output: Output, value: Model): Unit = { >>> output.writeLong(value.getType) >>> output.write(value.toBytes) >>> } >>> } >>> >>> object ModelSerializerKryo{ >>> private val factories = Map(ModelDescriptor.ModelType.PMML.value -> >>> PMMLModel, >>> ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel) >>> } >>> And added the following >>> >>> // Add custom serializer >>> env.getConfig.addDefaultKryoSerializer(classOf[Model], >>> classOf[ModelSerializerKryo]) >>> >>> To configure it. >>> I can see checkpoint messages at the output console, but I never hist a >>> break point in serializer. >>> Any suggestions? >>> >>> >>> >>> >>> Boris Lublinsky >>> FDP Architect >>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >>> https://www.lightbend.com/ <https://www.lightbend.com/> >> >> > > Begin forwarded message: > > From: Boris Lublinsky <boris.lublin...@lightbend.com> > Subject: Custom Kryo serializer > Date: July 14, 2017 at 8:50:22 AM CDT > To: user@flink.apache.org > > Hi > I have several implementations of my Model trait, > > trait Model { > def score(input : AnyVal) : AnyVal > def cleanup() : Unit > def toBytes() : Array[Byte] > def getType : Long > } > > neither one of them are serializable, but are used in the state definition. > So I implemented custom serializer > > import com.esotericsoftware.kryo.io > <http://com.esotericsoftware.kryo.io/>.{Input, Output} > import com.esotericsoftware.kryo.{Kryo, Serializer} > import com.lightbend.model.modeldescriptor.ModelDescriptor > > > class ModelSerializerKryo extends Serializer[Model]{ > > super.setAcceptsNull(false) > super.setImmutable(true) > > /** Reads bytes and returns a new object of the specified concrete type. > * <p> > * Before Kryo can be used to read child objects, {@link > Kryo#reference(Object)} must be called with the parent object to > * ensure it can be referenced by the child objects. Any serializer that > uses {@link Kryo} to read a child object may need to > * be reentrant. > * <p> > * This method should not be called directly, instead this serializer can > be passed to {@link Kryo} read methods that accept a > * serialier. > * > * @return May be null if { @link #getAcceptsNull()} is true. */ > > override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = { > > import ModelSerializerKryo._ > > val mType = input.readLong().asInstanceOf[Int] > val bytes = Stream.continually(input.readByte()).takeWhile(_ != > -1).toArray > factories.get(mType) match { > case Some(factory) => factory.restore(bytes) > case _ => throw new Exception(s"Unknown model type $mType to restore") > } > } > > /** Writes the bytes for the object to the output. > * <p> > * This method should not be called directly, instead this serializer can > be passed to {@link Kryo} write methods that accept a > * serialier. > * > * @param value May be null if { @link #getAcceptsNull()} is true. */ > > override def write(kryo: Kryo, output: Output, value: Model): Unit = { > output.writeLong(value.getType) > output.write(value.toBytes) > } > } > > object ModelSerializerKryo{ > private val factories = Map(ModelDescriptor.ModelType.PMML.value -> > PMMLModel, > ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel) > } > And added the following > > // Add custom serializer > env.getConfig.addDefaultKryoSerializer(classOf[Model], > classOf[ModelSerializerKryo]) > > To configure it. > I can see checkpoint messages at the output console, but I never hist a break > point in serializer. > Any suggestions? > > > > > Boris Lublinsky > FDP Architect > boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> > https://www.lightbend.com/ >