Hello - I am trying to understand my trouble passing an Avro message through 
Kafka (0.8)
>From what I see, the class tries to create an instance of the encoder but 
>fails as it can not find the constructor, although it is there.

Here's the code and subsequent error. Appreciate any help!

Thank you,
Brenden
----

public class AvroProducer {

    //    public final String zkConnection = 
"tlvwhale1:2181,tlvwhale2:2181,tlvwhale3:2181";
    public final String zkConnection = "localhost:2181";
    public final String brokerList = "localhost:9092, localhost:9093, 
localhost:9094";
    public final String topic = "cdrTopic";

    public static void main(String args[]){
        AvroProducer avroProducer = new AvroProducer();
        try {
//            avroProducer.testGenericRecord();
            avroProducer.sendCDRAvroMessage();
        } catch (Exception e) {
            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
        }
    }

    private void sendCDRAvroMessage() throws IOException {
        User user1 = new User();
        user1.setName("Brenden");
        user1.setFavoriteNumber(256);
        Properties props = new Properties();
        props.put("zk.connect", zkConnection);
        props.put("metadata.broker.list", brokerList);
//        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        props.put("serializer.class", 
"org.apache.avro.io.BufferedBinaryEncoder");
//        props.put("serializer.class", 
"org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder");
//        props.put("serializer.class", 
"org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder");

//        Producer<String, Message> producer = new Producer<String, 
Message>(new ProducerConfig(props));
        Producer<String, Message> producer = new Producer<String, Message>(new 
ProducerConfig(props));


        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DatumWriter<User> userDatumWriter = new 
SpecificDatumWriter<User>(User.class);
//        Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
//        Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
        EncoderFactory factory = new EncoderFactory().configureBufferSize(4096);
        Encoder encoder = factory.directBinaryEncoder(out, null);

            userDatumWriter.write(user1, encoder);
            encoder.flush();
            out.close();
            Message message = new Message(out.toByteArray());

        producer.send(new KeyedMessage<String, Message>(topic, message));
//        producer.send(new KeyedMessage<String, Message>(topic, null, 
message));
    }

}

---
The Error stack:
...
Message(magic = 0, attributes = 0, crc = 2755187525, key = null, payload = 
java.nio.HeapByteBuffer[pos=0 lim=12 cap=12])
Exception in thread "main" java.lang.NoSuchMethodException: 
org.apache.avro.io.DirectBinaryEncoder.<init>(kafka.utils.VerifiableProperties)
    at java.lang.Class.getConstructor0(Class.java:2810)
    at java.lang.Class.getConstructor(Class.java:1718)
    at kafka.utils.Utils$.createObject(Utils.scala:458)
    at kafka.producer.Producer.<init>(Producer.scala:60)
    at kafka.javaapi.producer.Producer.<init>(Producer.scala:25)
    at 
com.humca.swizzle.kafka.producer.KafkaProducer.sendAvroMessage(KafkaProducer.java:74)
    at 
com.humca.swizzle.kafka.producer.KafkaProducer.main(KafkaProducer.java:88)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

Reply via email to