http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception

Please try the suggestions at the above link.

It appears from

https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
that the class does not have a default constructor.

Ram

On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <[email protected]>
wrote:

>
> Hi,
>
> I am trying to read data from kafka, and my input in kafka is avro
> messages.
>
> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit
> records from kafka.. And in the next operator I am reading input as
> "byte[]” and deserializing the message!!
>
> But the tuple deserialization is failing with below error in the log…
>
> Can someone pls share your thoughts and help me fix this?
>
>
> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created 
> (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
> Serialization trace:
> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
>       at 
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>       at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>       at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>
>
>
> *Code FYR:*
>
>
> *Application.java* file:
>
> public void populateDAG(DAG dag, Configuration conf)
> {
>   //KafkaSinglePortStringInputOperator kafkaInput =  
> dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
>
>   KafkaSinglePortByteArrayInputOperator kafkaInput =  
> dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
>
>   AvroBytesConversionOperator avroConversion = 
> dag.addOperator("Avro_Convert", new 
> AvroBytesConversionOperator(*“*schemaRegURL"));
>
>   HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
>
>   //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, 
> hdfs.input);
>   dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, 
> avroConversion.input);
>   dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
>
> }
>
>
> *Operator Code:*
>
> public class AvroBytesConversionOperator extends BaseOperator {
>
>     private String schemaRegURL;
>     private KafkaAvroDecoder decoder;
>
>     public AvroBytesConversionOperator(){
>
>     }
>
>     public AvroBytesConversionOperator(String schemaRegURL){
>         this.schemaRegURL = schemaRegURL;
>     }
>
>     /**
>      * Defines Input Port - DefaultInputPort
>      * Accepts data from the upstream operator
>      * Type byte[]
>      */
>     public transient DefaultInputPort<byte[]> input = new 
> DefaultInputPort<byte[]>() {
>         @Override
>         public void process(byte[] tuple)
>         {
>             processTuple(tuple);
>         }
>     };
>
>
>     /**
>      * Defines Output Port - DefaultOutputPort
>      * Sends data to the down stream operator which can consume this data
>      * Type String
>      */
>     public transient DefaultOutputPort<String> output = new 
> DefaultOutputPort<String>();
>
>
>     /**
>      * Setup call
>      */
>     @Override
>     public void setup(OperatorContext context)
>     {
>         Properties props = new Properties();
>         props.setProperty("schema.registry.url", this.schemaRegURL);
>         this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
>     }
>
>     /**
>      * Begin window call for the operator.
>      * @param windowId
>      */
>     public void beginWindow(long windowId)
>     {
>
>     }
>
>     /**
>      * Defines what should be done with each incoming tuple
>      */
>     protected void processTuple(byte[] tuple)
>     {
>         GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
>         output.emit(record.toString());
>     }
>
>     /**
>      * End window call for the operator
>      * If sending per window, emit the updated counts here.
>      */
>     @Override
>     public void endWindow()
>     {
>
>     }
>
> }
>
>

Reply via email to