Hi

I can't get the Kafka/Avro serializer producer example to work.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by clint on 3/17/15.
 */
public class Confluent {

    public static void  main (String[] args){

        KafkaProducer<Object, Object> producer;
        Properties propsKafka = new Properties();

        propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
        propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        propsKafka.put("schema.registry.url", "http://localhost:8081";);
        producer = new KafkaProducer<Object, Object>(propsKafka);

        String key = "key1";
        String userSchema = "{\"type\":\"record\"," +
                "\"name\":\"myrecord\"," +
                "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);

        GenericRecord avroRecord = new GenericData.Record(schema);
        avroRecord.put("f1", "value4");

        ProducerRecord<Object, Object> data = new ProducerRecord<Object,
Object>("test", key , avroRecord);
        producer.send(data);

    }
}


The output is:

Mar 17, 2015 5:00:31 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values:
    compression.type = none
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    acks = 1
    batch.size = 16384
    reconnect.backoff.ms = 10
    bootstrap.servers = [localhost:9092]
    receive.buffer.bytes = 32768
    retry.backoff.ms = 100
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class
io.confluent.kafka.serializers.KafkaAvroSerializer
    retries = 0
    max.request.size = 1048576
    block.on.buffer.full = true
    value.serializer = class
io.confluent.kafka.serializers.KafkaAvroSerializer
    metrics.sample.window.ms = 30000
    send.buffer.bytes = 131072
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    linger.ms = 0
    client.id =

Mar 17, 2015 5:00:32 PM org.apache.kafka.common.config.AbstractConfig
logUnused
WARNING: The configuration schema.registry.url = null was supplied but
isn't a known config.

Please help

Thanks

Reply via email to