You can use `ByteArrayDeserializer` to get `byte[]` key and value from
the consumer and deserialize both "manually" using AvorDeserializer that
is created with MockSchemaRegistry. `ConsumerRecord` also give you the
topic name for each record via #topic().

-Matthias

On 10/29/18 4:03 PM, chinchu chinchu wrote:
> Thanks Mathias . How do I deal with this scenario in the case of a
> consumer   that expects a specific record  type  ?.I am trying to write  an
> integration test   for a scenario for the below scenairo.All of these uses
> an avro object as the value for producer record .I am  kind of able to
> write this  for simple data types but  have no luck with avro.
> 
> Produces record > topic > consumer > doWork() > produce .
> 
> On Mon, Oct 29, 2018 at 4:50 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> You set:
>>
>>>
>> senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass());
>>
>> This will tell the Producer to create a new AvroSerailizer object, and
>> this object expects "schema.registry.url" to be set during
>> initialization, ie, you need to add the config to `senderProps`.
>>
>> However, this will overall not give you want to what IMHO, as the
>> Producer won't used MockSchemaRegistry for this case.
>>
>> You would need to use
>>
>>> avroSerializer = new KafkaAvroSerializer(schemaRegistry, new
>> HashMap(defaultConfig));
>>
>> outside of the producer in your tests (ie, serialize the data
>> "manually"), and pass `byte[]/byte[]` type into the producer itself.
>>
>>
>> -Matthias
>>
>> On 10/29/18 2:56 PM, chinchu chinchu wrote:
>>> Hey folks,
>>> I am  getting the below exception when using a mockSchemaRegsitry in a
>>> junit test . I appreciate your help.I am  using confluent 4.0
>>>
>>>
>>>   private  SchemaRegistryClient schemaRegistry;
>>>   private  KafkaAvroSerializer avroSerializer;
>>>
>>> Properties defaultConfig = new Properties();
>>>
>> defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "
>>> http://fake-url";);
>>> defaultConfig.put("auto.register.schemas", false);
>>>     schemaRegistry = new MockSchemaRegistryClient();
>>>     avroSerializer = new KafkaAvroSerializer(schemaRegistry, new
>>> HashMap(defaultConfig));
>>>  Map<String, Object> senderProps  =
>>> KafkaTestUtils.producerProps(embeddedKafka);
>>> senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>> StringSerializer.class);
>>>
>> senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass());
>>> KafkaProducer<String, Log> producer = new
>>> KafkaProducer<String,Log>(senderProps);
>>> ProducerRecord<String,Log> record =new
>>> ProducerRecord<String,Log>("test2","1",log);
>>> try {
>>> producer.send(record).get();
>>> } catch (InterruptedException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>>
>>> Error
>>> ==============
>>> org.apache.kafka.common.KafkaException: Failed to construct kafka
>> producer
>>> at
>>>
>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441)
>>> at
>>>
>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:268)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>>
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>>> at
>>>
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>>
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>>> at
>>>
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at
>>>
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>>> at
>>>
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>>> at
>>>
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>>> at
>>>
>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
>>> at
>>>
>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>>> at
>>>
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
>>> at
>>>
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
>>> at
>>>
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
>>> at
>>>
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)
>>> Caused by: io.confluent.common.config.ConfigException: Missing required
>>> configuration "schema.registry.url" which has no default value.
>>> at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
>>> at
>> io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
>>> at
>>>
>> io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:61)
>>> at
>>>
>> io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
>>> at
>>>
>> io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
>>> at
>>>
>> org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60)
>>> at
>>>
>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:360)
>>> ... 28 more
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to