Hi

Thanks for sharing how to do this.

On Wed, Mar 16, 2016 at 6:18 PM, Gerard Klijs <gerard.kl...@dizzit.com> wrote:
> It all worked out, at least for the producer, which for now was enough.
> I only had to use a slight work-around to pas the ‘schema.registry.url’ 
> property to the (confluent) avroEncoder, since this is not a property of 
> kafka itself, and it is easy to work around this, I don’t think this needs to 
> be changed.
> I also noticed for both the ‘serializerClass’ and the ‘keySerializerClass’ a 
> encoder is now needed, which is what I would expect, but in the released 
> snapshot there was still an encoder needed.
>
> I read on the site 2.17 is due to be released soon, is this indeed the case?
>
> Here the main part of the code used:
>
> public void configure() {
>
>     // here is an example of some xml's getting unmarshalled to a avro object 
> and send to kafka
>     from("file:src/kafkadata?noop=true")
>         .unmarshal(getCastor())
>         .setHeader(KafkaConstants.KEY,body().method("getUser"))
>         //.marshal(getAvro())
>         .to("kafka:192.168.99.100:9093?topic=" + TOPIC + "&"
>                 + "requestRequiredAcks=-1&"
>                 + "serializerClass="+CamelKafkaAvroEncoder.class.getName()+"&"
>                 + "keySerializerClass="+ StringSerializer.class.getName()+"&"
>                 + "brokers=192.168.99.100:9093&"
>                 + "securityProtocol=SSL&"
>                 + "sslKeyPassword=notsecret&"
>                 + "sslKeystoreLocation=/client/client.keystore.jks&"
>                 + "sslKeystorePassword=notsecret&"
>                 + "sslTruststoreLocation=/client/client.truststore.jks&"
>                 + "sslTruststorePassword=notsecret");
> }
> And the work-around for the ‘schema.registry.url’ property (with the default 
> class it’s taken from the configs):
>
> /**
>  * needed because the schema registry url can't be passed to camel
>  */
> public class CamelKafkaAvroEncoder extends AbstractKafkaAvroSerializer 
> implements Serializer<Object> {
>
>     private static final String SCHEMA_REGISTRY_URL = 
> "http://192.168.99.100:8081";;
>     private static final String SCHEMA_KEY = 
> getSubjectName(XmlKafkaBuilder.TOPIC,false);
>
>     @Override
>     public void configure(Map<String, ?> configs, boolean isKey) {
>         Object maxSchemaObject = configs.get(
>                 AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_CONFIG);
>         if (maxSchemaObject == null) {
>             schemaRegistry = new CachedSchemaRegistryClient(
>                     SCHEMA_REGISTRY_URL, 
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>         } else {
>             schemaRegistry = new CachedSchemaRegistryClient(
>                     SCHEMA_REGISTRY_URL, (Integer) maxSchemaObject);
>         }
>     }
>
>     @Override
>     public byte[] serialize(String topic, Object record) {
>         return serializeImpl(SCHEMA_KEY, record);
>     }
>
>     @Override
>     public void close() {
>     }
> }
>
>
>> On 16 Mar 2016, at 17:03, Claus Ibsen <claus.ib...@gmail.com> wrote:
>>
>> Hi Gerard
>>
>> Okay sounds good. Let us know how it goes. Yeah Camel 2.17 has
>> migrated camel-kafka to use their new java library. So there is likely
>> a bunch of changes.
>>
>> And if you find something not being able to configure SSL etc then let
>> us know. And maybe you can find out how to setup SSL with their java
>> library, and see if we miss anything in camel-kafka to allow that.
>>
>> On Wed, Mar 16, 2016 at 1:35 PM, Gerard Klijs <gerard.kl...@dizzit.com> 
>> wrote:
>>> I'm now in the process of building the project from git, and I noticed it's
>>> already using the new consumer and properties. I will try it out to see if
>>> it works for me.
>>>
>>> On Wed, Mar 16, 2016 at 1:18 PM Gerard Klijs <gerard.kl...@dizzit.com>
>>> wrote:
>>>
>>>> The current version of the camel-kafka component already has the latest
>>>> released (0.9.0.1) kafka client included. To be able to use one of the new
>>>> features, mutual ssl authentication with the broker, the
>>>> current camel-kafka component needs some changes:
>>>> - Start using the new kafka consumer (), this changes a lot, different
>>>> properties are needed to create the connection, and a different mechanism
>>>> is used to subscribe to topics, and to get them.
>>>> - Allow at least 5 (preferably all) properties having to do with the ssl
>>>> configuration to be used for the ssl connection, both by the consumer and
>>>> the producer.
>>>>
>>>> I was wondering if anyone is thinking about doing this. We are currently
>>>> considering camel to connect non-java applications to our kafka cluster. We
>>>> are required to use ssl, both for the encryption, and for use with the
>>>> SimpleAclAuthorizer. It might be an option for us to do it ourselves and
>>>> summit the patch.
>>>>
>>>> I just started to take a look at camel, used it to read an xml, and send
>>>> it to kafka as avro, using the confluent schema registry.
>>>>
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> http://davsclaus.com @davsclaus
>> Camel in Action 2: https://www.manning.com/ibsen2
>



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to