Thanks a lot Thomas & Ramanath. Your suggestions helped!! My issue is fixed. Thank you.
Regards, Raja. From: Thomas Weise <thomas.we...@gmail.com<mailto:thomas.we...@gmail.com>> Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>> Date: Monday, June 6, 2016 at 12:21 PM To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>> Subject: Re: avrò deserialization fails when using kafka Since you are creating the decoder in setup(), please mark the property transient. No need to checkpoint it. Thanks, Thomas On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <r...@datatorrent.com<mailto:r...@datatorrent.com>> wrote: http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception Please try the suggestions at the above link. It appears from https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java that the class does not have a default constructor. Ram On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <raja.aravapa...@target.com<mailto:raja.aravapa...@target.com>> wrote: Hi, I am trying to read data from kafka, and my input in kafka is avro messages. So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records from kafka.. And in the next operator I am reading input as "byte[]” and deserializing the message!! But the tuple deserialization is failing with below error in the log… Can someone pls share your thoughts and help me fix this? Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder Serialization trace: decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator) at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) Code FYR: Application.java file: public void populateDAG(DAG dag, Configuration conf) { //KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class); KafkaSinglePortByteArrayInputOperator kafkaInput = dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator()); AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(“schemaRegURL")); HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class); //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input); dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input); dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input); } Operator Code: public class AvroBytesConversionOperator extends BaseOperator { private String schemaRegURL; private KafkaAvroDecoder decoder; public AvroBytesConversionOperator(){ } public AvroBytesConversionOperator(String schemaRegURL){ this.schemaRegURL = schemaRegURL; } /** * Defines Input Port - DefaultInputPort * Accepts data from the upstream operator * Type byte[] */ public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() { @Override public void process(byte[] tuple) { processTuple(tuple); } }; /** * Defines Output Port - DefaultOutputPort * Sends data to the down stream operator which can consume this data * Type String */ public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); /** * Setup call */ @Override public void setup(OperatorContext context) { Properties props = new Properties(); props.setProperty("schema.registry.url", this.schemaRegURL); this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props)); } /** * Begin window call for the operator. * @param windowId */ public void beginWindow(long windowId) { } /** * Defines what should be done with each incoming tuple */ protected void processTuple(byte[] tuple) { GenericRecord record = (GenericRecord) decoder.fromBytes(tuple); output.emit(record.toString()); } /** * End window call for the operator * If sending per window, emit the updated counts here. */ @Override public void endWindow() { } }