Hey Raja,
By default, the kafka input operator and the conversion operator will run
in different containers. You can set the stream locality to thread_local or
container_local. The input operator is io intensive and the your conversion
operator could be cpu intensive, correct me if I am wrong.
Another practice is you can extend the AbstractKafkaSinglePortInputOperator
and override the getTuple method
Here is an example:
public class AvroKafkaInputOperator extends
AbstractKafkaSinglePortInputOperator<GenericRecord>{
private String schemaRegURL;
private KafkaAvroDecoder decoder;
public AvroKafkaInputOperator(){
}
public AvroKafkaInputOperator(String schemaRegURL){
this.schemaRegURL = schemaRegURL;
}
/**
* Setup call
*/
@Override
public void setup(OperatorContext context)
{
Properties props = new Properties();
props.setProperty("schema.registry.url", this.schemaRegURL);
this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
}
public GenericRecord getTuple(Message msg)
{
return decoder.fromBytes(msg.payload().toArray());
}
}
On Mon, Jun 6, 2016 at 11:54 AM, Raja.Aravapalli <[email protected]
> wrote:
>
> After making the variable transient, it worked fine.
>
> Raja.
>
> From: "Raja.Aravapalli" <[email protected]>
> Date: Monday, June 6, 2016 at 1:52 PM
>
> To: "[email protected]" <[email protected]>
> Subject: Re: avrò deserialization fails when using kafka
>
>
> Thanks a lot Thomas & Ramanath.
>
> Your suggestions helped!! My issue is fixed. Thank you.
>
>
> Regards,
> Raja.
>
> From: Thomas Weise <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Monday, June 6, 2016 at 12:21 PM
> To: "[email protected]" <[email protected]>
> Subject: Re: avrò deserialization fails when using kafka
>
> Since you are creating the decoder in setup(), please mark the property
> transient. No need to checkpoint it.
>
> Thanks,
> Thomas
>
>
> On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <[email protected]>
> wrote:
>
>>
>> http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception
>>
>> Please try the suggestions at the above link.
>>
>> It appears from
>>
>> https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
>> that the class does not have a default constructor.
>>
>> Ram
>>
>> On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <
>> [email protected]> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am trying to read data from kafka, and my input in kafka is avro
>>> messages.
>>>
>>> So I am using class “KafkaSinglePortByteArrayInputOperator” to emit
>>> records from kafka.. And in the next operator I am reading input as
>>> "byte[]” and deserializing the message!!
>>>
>>> But the tuple deserialization is failing with below error in the log…
>>>
>>> Can someone pls share your thoughts and help me fix this?
>>>
>>>
>>> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created
>>> (missing no-arg constructor):
>>> io.confluent.kafka.serializers.KafkaAvroDecoder
>>> Serialization trace:
>>> decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
>>> at
>>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>>> at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>
>>>
>>>
>>> *Code FYR:*
>>>
>>>
>>> *Application.java* file:
>>>
>>> public void populateDAG(DAG dag, Configuration conf)
>>> {
>>> //KafkaSinglePortStringInputOperator kafkaInput =
>>> dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
>>>
>>> KafkaSinglePortByteArrayInputOperator kafkaInput =
>>> dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
>>>
>>> AvroBytesConversionOperator avroConversion =
>>> dag.addOperator("Avro_Convert", new
>>> AvroBytesConversionOperator(*“*schemaRegURL"));
>>>
>>> HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
>>>
>>> //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort,
>>> hdfs.input);
>>> dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort,
>>> avroConversion.input);
>>> dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
>>>
>>> }
>>>
>>>
>>> *Operator Code:*
>>>
>>> public class AvroBytesConversionOperator extends BaseOperator {
>>>
>>> private String schemaRegURL;
>>> private KafkaAvroDecoder decoder;
>>>
>>> public AvroBytesConversionOperator(){
>>>
>>> }
>>>
>>> public AvroBytesConversionOperator(String schemaRegURL){
>>> this.schemaRegURL = schemaRegURL;
>>> }
>>>
>>> /**
>>> * Defines Input Port - DefaultInputPort
>>> * Accepts data from the upstream operator
>>> * Type byte[]
>>> */
>>> public transient DefaultInputPort<byte[]> input = new
>>> DefaultInputPort<byte[]>() {
>>> @Override
>>> public void process(byte[] tuple)
>>> {
>>> processTuple(tuple);
>>> }
>>> };
>>>
>>>
>>> /**
>>> * Defines Output Port - DefaultOutputPort
>>> * Sends data to the down stream operator which can consume this data
>>> * Type String
>>> */
>>> public transient DefaultOutputPort<String> output = new
>>> DefaultOutputPort<String>();
>>>
>>>
>>> /**
>>> * Setup call
>>> */
>>> @Override
>>> public void setup(OperatorContext context)
>>> {
>>> Properties props = new Properties();
>>> props.setProperty("schema.registry.url", this.schemaRegURL);
>>> this.decoder = new KafkaAvroDecoder(new
>>> VerifiableProperties(props));
>>> }
>>>
>>> /**
>>> * Begin window call for the operator.
>>> * @param windowId
>>> */
>>> public void beginWindow(long windowId)
>>> {
>>>
>>> }
>>>
>>> /**
>>> * Defines what should be done with each incoming tuple
>>> */
>>> protected void processTuple(byte[] tuple)
>>> {
>>> GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
>>> output.emit(record.toString());
>>> }
>>>
>>> /**
>>> * End window call for the operator
>>> * If sending per window, emit the updated counts here.
>>> */
>>> @Override
>>> public void endWindow()
>>> {
>>>
>>> }
>>>
>>> }
>>>
>>>
>>
>