Hey Raja, By default, the kafka input operator and the conversion operator will run in different containers. You can set the stream locality to thread_local or container_local. The input operator is io intensive and the your conversion operator could be cpu intensive, correct me if I am wrong. Another practice is you can extend the AbstractKafkaSinglePortInputOperator and override the getTuple method Here is an example:
public class AvroKafkaInputOperator extends AbstractKafkaSinglePortInputOperator<GenericRecord>{ private String schemaRegURL; private KafkaAvroDecoder decoder; public AvroKafkaInputOperator(){ } public AvroKafkaInputOperator(String schemaRegURL){ this.schemaRegURL = schemaRegURL; } /** * Setup call */ @Override public void setup(OperatorContext context) { Properties props = new Properties(); props.setProperty("schema.registry.url", this.schemaRegURL); this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props)); } public GenericRecord getTuple(Message msg) { return decoder.fromBytes(msg.payload().toArray()); } } On Mon, Jun 6, 2016 at 11:54 AM, Raja.Aravapalli <raja.aravapa...@target.com > wrote: > > After making the variable transient, it worked fine. > > Raja. > > From: "Raja.Aravapalli" <raja.aravapa...@target.com> > Date: Monday, June 6, 2016 at 1:52 PM > > To: "users@apex.apache.org" <users@apex.apache.org> > Subject: Re: avrò deserialization fails when using kafka > > > Thanks a lot Thomas & Ramanath. > > Your suggestions helped!! My issue is fixed. Thank you. > > > Regards, > Raja. > > From: Thomas Weise <thomas.we...@gmail.com> > Reply-To: "users@apex.apache.org" <users@apex.apache.org> > Date: Monday, June 6, 2016 at 12:21 PM > To: "users@apex.apache.org" <users@apex.apache.org> > Subject: Re: avrò deserialization fails when using kafka > > Since you are creating the decoder in setup(), please mark the property > transient. No need to checkpoint it. > > Thanks, > Thomas > > > On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <r...@datatorrent.com> > wrote: > >> >> http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception >> >> Please try the suggestions at the above link. >> >> It appears from >> >> https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java >> that the class does not have a default constructor. >> >> Ram >> >> On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli < >> raja.aravapa...@target.com> wrote: >> >>> >>> Hi, >>> >>> I am trying to read data from kafka, and my input in kafka is avro >>> messages. >>> >>> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit >>> records from kafka.. And in the next operator I am reading input as >>> "byte[]” and deserializing the message!! >>> >>> But the tuple deserialization is failing with below error in the log… >>> >>> Can someone pls share your thoughts and help me fix this? >>> >>> >>> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created >>> (missing no-arg constructor): >>> io.confluent.kafka.serializers.KafkaAvroDecoder >>> Serialization trace: >>> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator) >>> at >>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) >>> at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) >>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>> >>> >>> >>> *Code FYR:* >>> >>> >>> *Application.java* file: >>> >>> public void populateDAG(DAG dag, Configuration conf) >>> { >>> //KafkaSinglePortStringInputOperator kafkaInput = >>> dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class); >>> >>> KafkaSinglePortByteArrayInputOperator kafkaInput = >>> dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator()); >>> >>> AvroBytesConversionOperator avroConversion = >>> dag.addOperator("Avro_Convert", new >>> AvroBytesConversionOperator(*“*schemaRegURL")); >>> >>> HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class); >>> >>> //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, >>> hdfs.input); >>> dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, >>> avroConversion.input); >>> dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input); >>> >>> } >>> >>> >>> *Operator Code:* >>> >>> public class AvroBytesConversionOperator extends BaseOperator { >>> >>> private String schemaRegURL; >>> private KafkaAvroDecoder decoder; >>> >>> public AvroBytesConversionOperator(){ >>> >>> } >>> >>> public AvroBytesConversionOperator(String schemaRegURL){ >>> this.schemaRegURL = schemaRegURL; >>> } >>> >>> /** >>> * Defines Input Port - DefaultInputPort >>> * Accepts data from the upstream operator >>> * Type byte[] >>> */ >>> public transient DefaultInputPort<byte[]> input = new >>> DefaultInputPort<byte[]>() { >>> @Override >>> public void process(byte[] tuple) >>> { >>> processTuple(tuple); >>> } >>> }; >>> >>> >>> /** >>> * Defines Output Port - DefaultOutputPort >>> * Sends data to the down stream operator which can consume this data >>> * Type String >>> */ >>> public transient DefaultOutputPort<String> output = new >>> DefaultOutputPort<String>(); >>> >>> >>> /** >>> * Setup call >>> */ >>> @Override >>> public void setup(OperatorContext context) >>> { >>> Properties props = new Properties(); >>> props.setProperty("schema.registry.url", this.schemaRegURL); >>> this.decoder = new KafkaAvroDecoder(new >>> VerifiableProperties(props)); >>> } >>> >>> /** >>> * Begin window call for the operator. >>> * @param windowId >>> */ >>> public void beginWindow(long windowId) >>> { >>> >>> } >>> >>> /** >>> * Defines what should be done with each incoming tuple >>> */ >>> protected void processTuple(byte[] tuple) >>> { >>> GenericRecord record = (GenericRecord) decoder.fromBytes(tuple); >>> output.emit(record.toString()); >>> } >>> >>> /** >>> * End window call for the operator >>> * If sending per window, emit the updated counts here. >>> */ >>> @Override >>> public void endWindow() >>> { >>> >>> } >>> >>> } >>> >>> >> >