Hi
I have several implementations of my Model trait,
trait Model {
def score(input : AnyVal) : AnyVal
def cleanup() : Unit
def toBytes() : Array[Byte]
def getType : Long
}
neither one of them are serializable, but are used in the state definition.
So I implemented custom serializer
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.lightbend.model.modeldescriptor.ModelDescriptor
class ModelSerializerKryo extends Serializer[Model]{
super.setAcceptsNull(false)
super.setImmutable(true)
/** Reads bytes and returns a new object of the specified concrete type.
* <p>
* Before Kryo can be used to read child objects, {@link
Kryo#reference(Object)} must be called with the parent object to
* ensure it can be referenced by the child objects. Any serializer that
uses {@link Kryo} to read a child object may need to
* be reentrant.
* <p>
* This method should not be called directly, instead this serializer can be
passed to {@link Kryo} read methods that accept a
* serialier.
*
* @return May be null if { @link #getAcceptsNull()} is true. */
override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model = {
import ModelSerializerKryo._
val mType = input.readLong().asInstanceOf[Int]
val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
factories.get(mType) match {
case Some(factory) => factory.restore(bytes)
case _ => throw new Exception(s"Unknown model type $mType to restore")
}
}
/** Writes the bytes for the object to the output.
* <p>
* This method should not be called directly, instead this serializer can be
passed to {@link Kryo} write methods that accept a
* serialier.
*
* @param value May be null if { @link #getAcceptsNull()} is true. */
override def write(kryo: Kryo, output: Output, value: Model): Unit = {
output.writeLong(value.getType)
output.write(value.toBytes)
}
}
object ModelSerializerKryo{
private val factories = Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
}
And added the following
// Add custom serializer
env.getConfig.addDefaultKryoSerializer(classOf[Model],
classOf[ModelSerializerKryo])
To configure it.
I can see checkpoint messages at the output console, but I never hist a break
point in serializer.
Any suggestions?
Boris Lublinsky
FDP Architect
[email protected]
https://www.lightbend.com/