Hi,
I am trying to read data from kafka, and my input in kafka is avro messages.
So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records
from kafka.. And in the next operator I am reading input as "byte[]” and
deserializing the message!!
But the tuple deserialization is failing with below error in the log…
Can someone pls share your thoughts and help me fix this?
Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created
(missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
at
com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
Code FYR:
Application.java file:
public void populateDAG(DAG dag, Configuration conf)
{
//KafkaSinglePortStringInputOperator kafkaInput =
dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);
KafkaSinglePortByteArrayInputOperator kafkaInput =
dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());
AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert",
new AvroBytesConversionOperator(“schemaRegURL"));
HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);
//dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort,
avroConversion.input);
dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);
}
Operator Code:
public class AvroBytesConversionOperator extends BaseOperator {
private String schemaRegURL;
private KafkaAvroDecoder decoder;
public AvroBytesConversionOperator(){
}
public AvroBytesConversionOperator(String schemaRegURL){
this.schemaRegURL = schemaRegURL;
}
/**
* Defines Input Port - DefaultInputPort
* Accepts data from the upstream operator
* Type byte[]
*/
public transient DefaultInputPort<byte[]> input = new
DefaultInputPort<byte[]>() {
@Override
public void process(byte[] tuple)
{
processTuple(tuple);
}
};
/**
* Defines Output Port - DefaultOutputPort
* Sends data to the down stream operator which can consume this data
* Type String
*/
public transient DefaultOutputPort<String> output = new
DefaultOutputPort<String>();
/**
* Setup call
*/
@Override
public void setup(OperatorContext context)
{
Properties props = new Properties();
props.setProperty("schema.registry.url", this.schemaRegURL);
this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
}
/**
* Begin window call for the operator.
* @param windowId
*/
public void beginWindow(long windowId)
{
}
/**
* Defines what should be done with each incoming tuple
*/
protected void processTuple(byte[] tuple)
{
GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
output.emit(record.toString());
}
/**
* End window call for the operator
* If sending per window, emit the updated counts here.
*/
@Override
public void endWindow()
{
}
}