Hello,

I assume you're passing the class of your serializer in a StateDescriptor constructor.

If so, you could add a breakpoint in Statedescriptor#initializeSerializerUnlessSet, and check what typeInfo is created and which serializer is created as a result.

One thing you could try right away is registering your serializer for the Model implementations,
instead of the trait.

Regards,
Chesnay

On 14.07.2017 15:50, Boris Lublinsky wrote:
Hi
I have several implementations of my Model trait,

trait Model {
   def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() : 
Array[Byte]
   def getType :Long }

neither one of them are serializable, but are used in the state definition.
So I implemented custom serializer

import com.esotericsoftware.kryo.io 
<http://com.esotericsoftware.kryo.io>.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor


class ModelSerializerKryoextends Serializer[Model]{
super.setAcceptsNull(false)
   super.setImmutable(true)

/** Reads bytes and returns a new object of the specified concrete type. * <p> * Before Kryo can be used to read child objects, {@link Kryo#reference(Object)} must be called with the parent object to * ensure it can be referenced by the child objects. Any serializer that uses {@link Kryo} to read a child object may need to * be reentrant. * <p> * This method should not be called directly, instead this serializer can be passed to {@link Kryo} read methods that accept a * serialier. * * @return May be null if { @link #getAcceptsNull()} is true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]): Model = {

     import ModelSerializerKryo._

     val mType = input.readLong().asInstanceOf[Int]
     val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
     factories.get(mType)match {
       case Some(factory) => factory.restore(bytes)
       case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
     }
   }

/** Writes the bytes for the object to the output. * <p> * This method should not be called directly, instead this serializer can be passed to {@link Kryo} write methods that accept a * serialier. * * @param value May be null if { @link #getAcceptsNull()} is true. */ override def write(kryo: Kryo, output: Output, value: Model):Unit = {
     output.writeLong(value.getType)
     output.write(value.toBytes)
   }
}

object ModelSerializerKryo{
   private val factories =Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel, 
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following

// Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model], 
classOf[ModelSerializerKryo])

To configure it.
I can see checkpoint messages at the output console, but I never hist a break point in serializer.
Any suggestions?



Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
https://www.lightbend.com/


Reply via email to