Since you are creating the decoder in setup(), please mark the property
transient. No need to checkpoint it.

Thanks,
Thomas


On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <[email protected]>
wrote:

>
> http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
>
> Please try the suggestions at the above link.
>
> It appears from
>
> https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
> that the class does not have a default constructor.
>
> Ram
>
> On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <
> [email protected]> wrote:
>
>>
>> Hi,
>>
>> I am trying to read data from kafka, and my input in kafka is avro
>> messages.
>>
>> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit
>> records from kafka.. And in the next operator I am reading input as
>> "byte[]” and deserializing the message!!
>>
>> But the tuple deserialization is failing with below error in the log…
>>
>> Can someone pls share your thoughts and help me fix this?
>>
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created 
>> (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
>> Serialization trace:
>> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
>>      at 
>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>>      at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>>      at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>>      at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>>      at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>>      at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>
>>
>>
>> *Code FYR:*
>>
>>
>> *Application.java* file:
>>
>> public void populateDAG(DAG dag, Configuration conf)
>> {
>>   //KafkaSinglePortStringInputOperator kafkaInput =  
>> dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
>>
>>   KafkaSinglePortByteArrayInputOperator kafkaInput =  
>> dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
>>
>>   AvroBytesConversionOperator avroConversion = 
>> dag.addOperator("Avro_Convert", new 
>> AvroBytesConversionOperator(*“*schemaRegURL"));
>>
>>   HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
>>
>>   //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, 
>> hdfs.input);
>>   dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, 
>> avroConversion.input);
>>   dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
>>
>> }
>>
>>
>> *Operator Code:*
>>
>> public class AvroBytesConversionOperator extends BaseOperator {
>>
>>     private String schemaRegURL;
>>     private KafkaAvroDecoder decoder;
>>
>>     public AvroBytesConversionOperator(){
>>
>>     }
>>
>>     public AvroBytesConversionOperator(String schemaRegURL){
>>         this.schemaRegURL = schemaRegURL;
>>     }
>>
>>     /**
>>      * Defines Input Port - DefaultInputPort
>>      * Accepts data from the upstream operator
>>      * Type byte[]
>>      */
>>     public transient DefaultInputPort<byte[]> input = new 
>> DefaultInputPort<byte[]>() {
>>         @Override
>>         public void process(byte[] tuple)
>>         {
>>             processTuple(tuple);
>>         }
>>     };
>>
>>
>>     /**
>>      * Defines Output Port - DefaultOutputPort
>>      * Sends data to the down stream operator which can consume this data
>>      * Type String
>>      */
>>     public transient DefaultOutputPort<String> output = new 
>> DefaultOutputPort<String>();
>>
>>
>>     /**
>>      * Setup call
>>      */
>>     @Override
>>     public void setup(OperatorContext context)
>>     {
>>         Properties props = new Properties();
>>         props.setProperty("schema.registry.url", this.schemaRegURL);
>>         this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
>>     }
>>
>>     /**
>>      * Begin window call for the operator.
>>      * @param windowId
>>      */
>>     public void beginWindow(long windowId)
>>     {
>>
>>     }
>>
>>     /**
>>      * Defines what should be done with each incoming tuple
>>      */
>>     protected void processTuple(byte[] tuple)
>>     {
>>         GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
>>         output.emit(record.toString());
>>     }
>>
>>     /**
>>      * End window call for the operator
>>      * If sending per window, emit the updated counts here.
>>      */
>>     @Override
>>     public void endWindow()
>>     {
>>
>>     }
>>
>> }
>>
>>
>

Reply via email to