http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
Please try the suggestions at the above link. It appears from https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java that the class does not have a default constructor. Ram On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <[email protected]> wrote: > > Hi, > > I am trying to read data from kafka, and my input in kafka is avro > messages. > > So I am using class “KafkaSinglePortByteArrayInputOperator” to emit > records from kafka.. And in the next operator I am reading input as > "byte[]” and deserializing the message!! > > But the tuple deserialization is failing with below error in the log… > > Can someone pls share your thoughts and help me fix this? > > > Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created > (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder > Serialization trace: > decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator) > at > com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) > at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > > > > *Code FYR:* > > > *Application.java* file: > > public void populateDAG(DAG dag, Configuration conf) > { > //KafkaSinglePortStringInputOperator kafkaInput = > dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class); > > KafkaSinglePortByteArrayInputOperator kafkaInput = > dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator()); > > AvroBytesConversionOperator avroConversion = > dag.addOperator("Avro_Convert", new > AvroBytesConversionOperator(*“*schemaRegURL")); > > HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class); > > //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, > hdfs.input); > dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, > avroConversion.input); > dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input); > > } > > > *Operator Code:* > > public class AvroBytesConversionOperator extends BaseOperator { > > private String schemaRegURL; > private KafkaAvroDecoder decoder; > > public AvroBytesConversionOperator(){ > > } > > public AvroBytesConversionOperator(String schemaRegURL){ > this.schemaRegURL = schemaRegURL; > } > > /** > * Defines Input Port - DefaultInputPort > * Accepts data from the upstream operator > * Type byte[] > */ > public transient DefaultInputPort<byte[]> input = new > DefaultInputPort<byte[]>() { > @Override > public void process(byte[] tuple) > { > processTuple(tuple); > } > }; > > > /** > * Defines Output Port - DefaultOutputPort > * Sends data to the down stream operator which can consume this data > * Type String > */ > public transient DefaultOutputPort<String> output = new > DefaultOutputPort<String>(); > > > /** > * Setup call > */ > @Override > public void setup(OperatorContext context) > { > Properties props = new Properties(); > props.setProperty("schema.registry.url", this.schemaRegURL); > this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props)); > } > > /** > * Begin window call for the operator. > * @param windowId > */ > public void beginWindow(long windowId) > { > > } > > /** > * Defines what should be done with each incoming tuple > */ > protected void processTuple(byte[] tuple) > { > GenericRecord record = (GenericRecord) decoder.fromBytes(tuple); > output.emit(record.toString()); > } > > /** > * End window call for the operator > * If sending per window, emit the updated counts here. > */ > @Override > public void endWindow() > { > > } > > } > >
