It all worked out, at least for the producer, which for now was enough.
I only had to use a slight work-around to pas the ‘schema.registry.url’
property to the (confluent) avroEncoder, since this is not a property of kafka
itself, and it is easy to work around this, I don’t think this needs to be
changed.
I also noticed for both the ‘serializerClass’ and the ‘keySerializerClass’ a
encoder is now needed, which is what I would expect, but in the released
snapshot there was still an encoder needed.
I read on the site 2.17 is due to be released soon, is this indeed the case?
Here the main part of the code used:
public void configure() {
// here is an example of some xml's getting unmarshalled to a avro object
and send to kafka
from("file:src/kafkadata?noop=true")
.unmarshal(getCastor())
.setHeader(KafkaConstants.KEY,body().method("getUser"))
//.marshal(getAvro())
.to("kafka:192.168.99.100:9093?topic=" + TOPIC + "&"
+ "requestRequiredAcks=-1&"
+ "serializerClass="+CamelKafkaAvroEncoder.class.getName()+"&"
+ "keySerializerClass="+ StringSerializer.class.getName()+"&"
+ "brokers=192.168.99.100:9093&"
+ "securityProtocol=SSL&"
+ "sslKeyPassword=notsecret&"
+ "sslKeystoreLocation=/client/client.keystore.jks&"
+ "sslKeystorePassword=notsecret&"
+ "sslTruststoreLocation=/client/client.truststore.jks&"
+ "sslTruststorePassword=notsecret");
}
And the work-around for the ‘schema.registry.url’ property (with the default
class it’s taken from the configs):
/**
* needed because the schema registry url can't be passed to camel
*/
public class CamelKafkaAvroEncoder extends AbstractKafkaAvroSerializer
implements Serializer<Object> {
private static final String SCHEMA_REGISTRY_URL =
"http://192.168.99.100:8081";
private static final String SCHEMA_KEY =
getSubjectName(XmlKafkaBuilder.TOPIC,false);
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Object maxSchemaObject = configs.get(
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_CONFIG);
if (maxSchemaObject == null) {
schemaRegistry = new CachedSchemaRegistryClient(
SCHEMA_REGISTRY_URL,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
} else {
schemaRegistry = new CachedSchemaRegistryClient(
SCHEMA_REGISTRY_URL, (Integer) maxSchemaObject);
}
}
@Override
public byte[] serialize(String topic, Object record) {
return serializeImpl(SCHEMA_KEY, record);
}
@Override
public void close() {
}
}
> On 16 Mar 2016, at 17:03, Claus Ibsen <[email protected]> wrote:
>
> Hi Gerard
>
> Okay sounds good. Let us know how it goes. Yeah Camel 2.17 has
> migrated camel-kafka to use their new java library. So there is likely
> a bunch of changes.
>
> And if you find something not being able to configure SSL etc then let
> us know. And maybe you can find out how to setup SSL with their java
> library, and see if we miss anything in camel-kafka to allow that.
>
> On Wed, Mar 16, 2016 at 1:35 PM, Gerard Klijs <[email protected]> wrote:
>> I'm now in the process of building the project from git, and I noticed it's
>> already using the new consumer and properties. I will try it out to see if
>> it works for me.
>>
>> On Wed, Mar 16, 2016 at 1:18 PM Gerard Klijs <[email protected]>
>> wrote:
>>
>>> The current version of the camel-kafka component already has the latest
>>> released (0.9.0.1) kafka client included. To be able to use one of the new
>>> features, mutual ssl authentication with the broker, the
>>> current camel-kafka component needs some changes:
>>> - Start using the new kafka consumer (), this changes a lot, different
>>> properties are needed to create the connection, and a different mechanism
>>> is used to subscribe to topics, and to get them.
>>> - Allow at least 5 (preferably all) properties having to do with the ssl
>>> configuration to be used for the ssl connection, both by the consumer and
>>> the producer.
>>>
>>> I was wondering if anyone is thinking about doing this. We are currently
>>> considering camel to connect non-java applications to our kafka cluster. We
>>> are required to use ssl, both for the encryption, and for use with the
>>> SimpleAclAuthorizer. It might be an option for us to do it ourselves and
>>> summit the patch.
>>>
>>> I just started to take a look at camel, used it to read an xml, and send
>>> it to kafka as avro, using the confluent schema registry.
>>>
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2