Hi Thanks for sharing how to do this.
On Wed, Mar 16, 2016 at 6:18 PM, Gerard Klijs <gerard.kl...@dizzit.com> wrote: > It all worked out, at least for the producer, which for now was enough. > I only had to use a slight work-around to pas the ‘schema.registry.url’ > property to the (confluent) avroEncoder, since this is not a property of > kafka itself, and it is easy to work around this, I don’t think this needs to > be changed. > I also noticed for both the ‘serializerClass’ and the ‘keySerializerClass’ a > encoder is now needed, which is what I would expect, but in the released > snapshot there was still an encoder needed. > > I read on the site 2.17 is due to be released soon, is this indeed the case? > > Here the main part of the code used: > > public void configure() { > > // here is an example of some xml's getting unmarshalled to a avro object > and send to kafka > from("file:src/kafkadata?noop=true") > .unmarshal(getCastor()) > .setHeader(KafkaConstants.KEY,body().method("getUser")) > //.marshal(getAvro()) > .to("kafka:192.168.99.100:9093?topic=" + TOPIC + "&" > + "requestRequiredAcks=-1&" > + "serializerClass="+CamelKafkaAvroEncoder.class.getName()+"&" > + "keySerializerClass="+ StringSerializer.class.getName()+"&" > + "brokers=192.168.99.100:9093&" > + "securityProtocol=SSL&" > + "sslKeyPassword=notsecret&" > + "sslKeystoreLocation=/client/client.keystore.jks&" > + "sslKeystorePassword=notsecret&" > + "sslTruststoreLocation=/client/client.truststore.jks&" > + "sslTruststorePassword=notsecret"); > } > And the work-around for the ‘schema.registry.url’ property (with the default > class it’s taken from the configs): > > /** > * needed because the schema registry url can't be passed to camel > */ > public class CamelKafkaAvroEncoder extends AbstractKafkaAvroSerializer > implements Serializer<Object> { > > private static final String SCHEMA_REGISTRY_URL = > "http://192.168.99.100:8081"; > private static final String SCHEMA_KEY = > getSubjectName(XmlKafkaBuilder.TOPIC,false); > > @Override > public void configure(Map<String, ?> configs, boolean isKey) { > Object maxSchemaObject = configs.get( > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_CONFIG); > if (maxSchemaObject == null) { > schemaRegistry = new CachedSchemaRegistryClient( > SCHEMA_REGISTRY_URL, > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); > } else { > schemaRegistry = new CachedSchemaRegistryClient( > SCHEMA_REGISTRY_URL, (Integer) maxSchemaObject); > } > } > > @Override > public byte[] serialize(String topic, Object record) { > return serializeImpl(SCHEMA_KEY, record); > } > > @Override > public void close() { > } > } > > >> On 16 Mar 2016, at 17:03, Claus Ibsen <claus.ib...@gmail.com> wrote: >> >> Hi Gerard >> >> Okay sounds good. Let us know how it goes. Yeah Camel 2.17 has >> migrated camel-kafka to use their new java library. So there is likely >> a bunch of changes. >> >> And if you find something not being able to configure SSL etc then let >> us know. And maybe you can find out how to setup SSL with their java >> library, and see if we miss anything in camel-kafka to allow that. >> >> On Wed, Mar 16, 2016 at 1:35 PM, Gerard Klijs <gerard.kl...@dizzit.com> >> wrote: >>> I'm now in the process of building the project from git, and I noticed it's >>> already using the new consumer and properties. I will try it out to see if >>> it works for me. >>> >>> On Wed, Mar 16, 2016 at 1:18 PM Gerard Klijs <gerard.kl...@dizzit.com> >>> wrote: >>> >>>> The current version of the camel-kafka component already has the latest >>>> released (0.9.0.1) kafka client included. To be able to use one of the new >>>> features, mutual ssl authentication with the broker, the >>>> current camel-kafka component needs some changes: >>>> - Start using the new kafka consumer (), this changes a lot, different >>>> properties are needed to create the connection, and a different mechanism >>>> is used to subscribe to topics, and to get them. >>>> - Allow at least 5 (preferably all) properties having to do with the ssl >>>> configuration to be used for the ssl connection, both by the consumer and >>>> the producer. >>>> >>>> I was wondering if anyone is thinking about doing this. We are currently >>>> considering camel to connect non-java applications to our kafka cluster. We >>>> are required to use ssl, both for the encryption, and for use with the >>>> SimpleAclAuthorizer. It might be an option for us to do it ourselves and >>>> summit the patch. >>>> >>>> I just started to take a look at camel, used it to read an xml, and send >>>> it to kafka as avro, using the confluent schema registry. >>>> >> >> >> >> -- >> Claus Ibsen >> ----------------- >> http://davsclaus.com @davsclaus >> Camel in Action 2: https://www.manning.com/ibsen2 > -- Claus Ibsen ----------------- http://davsclaus.com @davsclaus Camel in Action 2: https://www.manning.com/ibsen2