Hey Raja,

By default, the kafka input operator and the conversion operator will run
in different containers. You can set the stream locality to thread_local or
container_local. The input operator is io intensive and the your conversion
operator could be cpu intensive, correct me if I am wrong.
Another practice is you can extend the AbstractKafkaSinglePortInputOperator
and override the getTuple method
Here is an example:

public class AvroKafkaInputOperator extends
AbstractKafkaSinglePortInputOperator<GenericRecord>{

    private String schemaRegURL;
    private KafkaAvroDecoder decoder;

    public AvroKafkaInputOperator(){

    }

    public AvroKafkaInputOperator(String schemaRegURL){
        this.schemaRegURL = schemaRegURL;
    }

    /**
     * Setup call
     */
    @Override
    public void setup(OperatorContext context)
    {
        Properties props = new Properties();
        props.setProperty("schema.registry.url", this.schemaRegURL);
        this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
    }

    public GenericRecord getTuple(Message msg)
    {
       return decoder.fromBytes(msg.payload().toArray());
    }


}




On Mon, Jun 6, 2016 at 11:54 AM, Raja.Aravapalli <raja.aravapa...@target.com
> wrote:

>
> After making the variable transient, it worked fine.
>
> Raja.
>
> From: "Raja.Aravapalli" <raja.aravapa...@target.com>
> Date: Monday, June 6, 2016 at 1:52 PM
>
> To: "users@apex.apache.org" <users@apex.apache.org>
> Subject: Re: avrò deserialization fails when using kafka
>
>
> Thanks a lot Thomas & Ramanath.
>
> Your suggestions helped!! My issue is fixed. Thank you.
>
>
> Regards,
> Raja.
>
> From: Thomas Weise <thomas.we...@gmail.com>
> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
> Date: Monday, June 6, 2016 at 12:21 PM
> To: "users@apex.apache.org" <users@apex.apache.org>
> Subject: Re: avrò deserialization fails when using kafka
>
> Since you are creating the decoder in setup(), please mark the property
> transient. No need to checkpoint it.
>
> Thanks,
> Thomas
>
>
> On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <r...@datatorrent.com>
> wrote:
>
>>
>> http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
>>
>> Please try the suggestions at the above link.
>>
>> It appears from
>>
>> https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
>> that the class does not have a default constructor.
>>
>> Ram
>>
>> On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <
>> raja.aravapa...@target.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am trying to read data from kafka, and my input in kafka is avro
>>> messages.
>>>
>>> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit
>>> records from kafka.. And in the next operator I am reading input as
>>> "byte[]” and deserializing the message!!
>>>
>>> But the tuple deserialization is failing with below error in the log…
>>>
>>> Can someone pls share your thoughts and help me fix this?
>>>
>>>
>>> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created 
>>> (missing no-arg constructor): 
>>> io.confluent.kafka.serializers.KafkaAvroDecoder
>>> Serialization trace:
>>> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
>>>     at 
>>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>>>     at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>>>     at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>>>     at 
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>>>     at 
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>
>>>
>>>
>>> *Code FYR:*
>>>
>>>
>>> *Application.java* file:
>>>
>>> public void populateDAG(DAG dag, Configuration conf)
>>> {
>>>   //KafkaSinglePortStringInputOperator kafkaInput =  
>>> dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
>>>
>>>   KafkaSinglePortByteArrayInputOperator kafkaInput =  
>>> dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
>>>
>>>   AvroBytesConversionOperator avroConversion = 
>>> dag.addOperator("Avro_Convert", new 
>>> AvroBytesConversionOperator(*“*schemaRegURL"));
>>>
>>>   HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
>>>
>>>   //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, 
>>> hdfs.input);
>>>   dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, 
>>> avroConversion.input);
>>>   dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
>>>
>>> }
>>>
>>>
>>> *Operator Code:*
>>>
>>> public class AvroBytesConversionOperator extends BaseOperator {
>>>
>>>     private String schemaRegURL;
>>>     private KafkaAvroDecoder decoder;
>>>
>>>     public AvroBytesConversionOperator(){
>>>
>>>     }
>>>
>>>     public AvroBytesConversionOperator(String schemaRegURL){
>>>         this.schemaRegURL = schemaRegURL;
>>>     }
>>>
>>>     /**
>>>      * Defines Input Port - DefaultInputPort
>>>      * Accepts data from the upstream operator
>>>      * Type byte[]
>>>      */
>>>     public transient DefaultInputPort<byte[]> input = new 
>>> DefaultInputPort<byte[]>() {
>>>         @Override
>>>         public void process(byte[] tuple)
>>>         {
>>>             processTuple(tuple);
>>>         }
>>>     };
>>>
>>>
>>>     /**
>>>      * Defines Output Port - DefaultOutputPort
>>>      * Sends data to the down stream operator which can consume this data
>>>      * Type String
>>>      */
>>>     public transient DefaultOutputPort<String> output = new 
>>> DefaultOutputPort<String>();
>>>
>>>
>>>     /**
>>>      * Setup call
>>>      */
>>>     @Override
>>>     public void setup(OperatorContext context)
>>>     {
>>>         Properties props = new Properties();
>>>         props.setProperty("schema.registry.url", this.schemaRegURL);
>>>         this.decoder = new KafkaAvroDecoder(new 
>>> VerifiableProperties(props));
>>>     }
>>>
>>>     /**
>>>      * Begin window call for the operator.
>>>      * @param windowId
>>>      */
>>>     public void beginWindow(long windowId)
>>>     {
>>>
>>>     }
>>>
>>>     /**
>>>      * Defines what should be done with each incoming tuple
>>>      */
>>>     protected void processTuple(byte[] tuple)
>>>     {
>>>         GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
>>>         output.emit(record.toString());
>>>     }
>>>
>>>     /**
>>>      * End window call for the operator
>>>      * If sending per window, emit the updated counts here.
>>>      */
>>>     @Override
>>>     public void endWindow()
>>>     {
>>>
>>>     }
>>>
>>> }
>>>
>>>
>>
>

Reply via email to