Thanks to Gwen and Tommy Baker for their helpful replies.

Currently, the environment I need to work with doesn't use the Schema Registry; 
hopefully one day it will but for now that's not an option.  Events are written 
to Kafka without the schema embedded and each side of the interface assumes a 
given schema, with the consequent risks accepted.

To serialize a SpecificRecord for the 
org.apache.kafka.connect.storage.Converter interface (in the absence of access 
to the Confluent implementation classes) I was thinking of something along 
these lines to Avro encode a SpecificRecord:

    private byte[] toAvro(Schema schema, SpecificRecord record) throws 
IOException{
        SpecificDatumWriter<SpecificRecord> writer = new 
SpecificDatumWriter<>(schema);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder = null;
        binaryEncoder = new EncoderFactory().binaryEncoder(baos, binaryEncoder);
        writer.write(record, binaryEncoder);
        return baos.toByteArray();
    }

To work with Kafka Connect I need to comply with the 
org.apache.kafka.connect.storage .Converter interface
The Converter interface defines the following methods:

void configure(Map<String, ?> configs, boolean isKey);
byte[] fromConnectData(String topic, Schema schema, Object value);
SchemaAndValue toConnectData(String topic, byte[] value);

Is it safe to provide a no-op implementation for configure().

The toConnectData() method will presumably be achieved via a corresponding 
SpecificDatumReader.

Does this look a reasonable approach?

Many thanks if you've read this far!

Regards,
David


-----Original Message-----
From: Gwen Shapira [mailto:g...@confluent.io] 
Sent: 02 November 2016 21:18
To: dev@kafka.apache.org
Subject: Re: Kafka Connect key.converter and value.converter properties for 
Avro encoding

Both the Confluent Avro Converter and the Confluent Avro Serializer use the 
Schema Registry. The reason is, as Tommy Becker mentioned below, to avoid 
storing the entire schema in each record (which the JSON serializer in Apache 
Kafka does). It has few other benefits schema validation and such.

If you are interested in trying this approach, you will want to use the 
Converter, since it was written specifically to integrate with Connect.
If you prefer another approach, without the Schema Registry, you can write your 
own Converter - that's why we made them pluggable. Feel free to copy ours and 
modify it as fits your Avro approach.

Gwen

On Wed, Nov 2, 2016 at 2:48 AM, <david.frank...@bt.com> wrote:

> I am using Kafka Connect in source mode i.e. using it to send events 
> to Kafka topics.
>
> With the key.converter and value.converter properties set to 
> org.apache.kafka.connect.storage.StringConverter I can attach a 
> consumer to the topics and see the events in a readable form.  This is 
> helpful and reassuring but it is not the desired representation for my 
> downstream consumers - these require the events to be Avro encoded.
>
> It seems that to write the events to Kafka Avro encoded, these 
> properties need to be set to 
> io.confluent.kafka.serializers.KafkaAvroSerializer.  Is this correct?
>
> I am not using the Confluent platform, merely the standard Kafka 10 
> download, and have been unable to find out how to get at these from a 
> Maven repository jar.  
> http://docs.confluent.io/3.0.0/app-development.html#java
> suggest that these are available via:
>
>                <dependency>
>          <groupId>io.confluent</groupId>
>          <artifactId>kafka-avro-serializer</artifactId>
>          <version>3.0.0</version>
>      </dependency>
>
> But it doesn't appear to be true.  The class exists in
> https://raw.githubusercontent.com/confluentinc/schema-
> registry/master/avro-converter/src/main/java/io/confluent/connect/avro
> / AvroConverter.java but this seems to use the Schema Registry which 
> is something I'd rather avoid.
>
> I'd be grateful for any pointers on the simplest way of getting Avro 
> encoded events written to Kafka from a Kafka Connect source connector/task.
>
> Also in the task which creates SourceRecords, I'm choosing 
> Schema.BYTES_SCHEMA for the 4th arg in the constructor.  But I'm not 
> clear what this achieves - some light shed on that would also be helpful.
>
> Many thanks,
> David
>



--
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter <https://twitter.com/ConfluentInc> | blog 
<http://www.confluent.io/blog>

Reply via email to