Since you are creating the decoder in setup(), please mark the property transient. No need to checkpoint it.
Thanks, Thomas On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <[email protected]> wrote: > > http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception > > Please try the suggestions at the above link. > > It appears from > > https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java > that the class does not have a default constructor. > > Ram > > On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli < > [email protected]> wrote: > >> >> Hi, >> >> I am trying to read data from kafka, and my input in kafka is avro >> messages. >> >> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit >> records from kafka.. And in the next operator I am reading input as >> "byte[]” and deserializing the message!! >> >> But the tuple deserialization is failing with below error in the log… >> >> Can someone pls share your thoughts and help me fix this? >> >> >> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created >> (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder >> Serialization trace: >> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator) >> at >> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) >> at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) >> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> >> >> >> *Code FYR:* >> >> >> *Application.java* file: >> >> public void populateDAG(DAG dag, Configuration conf) >> { >> //KafkaSinglePortStringInputOperator kafkaInput = >> dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class); >> >> KafkaSinglePortByteArrayInputOperator kafkaInput = >> dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator()); >> >> AvroBytesConversionOperator avroConversion = >> dag.addOperator("Avro_Convert", new >> AvroBytesConversionOperator(*“*schemaRegURL")); >> >> HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class); >> >> //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, >> hdfs.input); >> dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, >> avroConversion.input); >> dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input); >> >> } >> >> >> *Operator Code:* >> >> public class AvroBytesConversionOperator extends BaseOperator { >> >> private String schemaRegURL; >> private KafkaAvroDecoder decoder; >> >> public AvroBytesConversionOperator(){ >> >> } >> >> public AvroBytesConversionOperator(String schemaRegURL){ >> this.schemaRegURL = schemaRegURL; >> } >> >> /** >> * Defines Input Port - DefaultInputPort >> * Accepts data from the upstream operator >> * Type byte[] >> */ >> public transient DefaultInputPort<byte[]> input = new >> DefaultInputPort<byte[]>() { >> @Override >> public void process(byte[] tuple) >> { >> processTuple(tuple); >> } >> }; >> >> >> /** >> * Defines Output Port - DefaultOutputPort >> * Sends data to the down stream operator which can consume this data >> * Type String >> */ >> public transient DefaultOutputPort<String> output = new >> DefaultOutputPort<String>(); >> >> >> /** >> * Setup call >> */ >> @Override >> public void setup(OperatorContext context) >> { >> Properties props = new Properties(); >> props.setProperty("schema.registry.url", this.schemaRegURL); >> this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props)); >> } >> >> /** >> * Begin window call for the operator. >> * @param windowId >> */ >> public void beginWindow(long windowId) >> { >> >> } >> >> /** >> * Defines what should be done with each incoming tuple >> */ >> protected void processTuple(byte[] tuple) >> { >> GenericRecord record = (GenericRecord) decoder.fromBytes(tuple); >> output.emit(record.toString()); >> } >> >> /** >> * End window call for the operator >> * If sending per window, emit the updated counts here. >> */ >> @Override >> public void endWindow() >> { >> >> } >> >> } >> >> >
