You can use `ByteArrayDeserializer` to get `byte[]` key and value from the consumer and deserialize both "manually" using AvorDeserializer that is created with MockSchemaRegistry. `ConsumerRecord` also give you the topic name for each record via #topic().
-Matthias On 10/29/18 4:03 PM, chinchu chinchu wrote: > Thanks Mathias . How do I deal with this scenario in the case of a > consumer that expects a specific record type ?.I am trying to write an > integration test for a scenario for the below scenairo.All of these uses > an avro object as the value for producer record .I am kind of able to > write this for simple data types but have no luck with avro. > > Produces record > topic > consumer > doWork() > produce . > > On Mon, Oct 29, 2018 at 4:50 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> You set: >> >>> >> senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass()); >> >> This will tell the Producer to create a new AvroSerailizer object, and >> this object expects "schema.registry.url" to be set during >> initialization, ie, you need to add the config to `senderProps`. >> >> However, this will overall not give you want to what IMHO, as the >> Producer won't used MockSchemaRegistry for this case. >> >> You would need to use >> >>> avroSerializer = new KafkaAvroSerializer(schemaRegistry, new >> HashMap(defaultConfig)); >> >> outside of the producer in your tests (ie, serialize the data >> "manually"), and pass `byte[]/byte[]` type into the producer itself. >> >> >> -Matthias >> >> On 10/29/18 2:56 PM, chinchu chinchu wrote: >>> Hey folks, >>> I am getting the below exception when using a mockSchemaRegsitry in a >>> junit test . I appreciate your help.I am using confluent 4.0 >>> >>> >>> private SchemaRegistryClient schemaRegistry; >>> private KafkaAvroSerializer avroSerializer; >>> >>> Properties defaultConfig = new Properties(); >>> >> defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " >>> http://fake-url"); >>> defaultConfig.put("auto.register.schemas", false); >>> schemaRegistry = new MockSchemaRegistryClient(); >>> avroSerializer = new KafkaAvroSerializer(schemaRegistry, new >>> HashMap(defaultConfig)); >>> Map<String, Object> senderProps = >>> KafkaTestUtils.producerProps(embeddedKafka); >>> senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, >>> StringSerializer.class); >>> >> senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass()); >>> KafkaProducer<String, Log> producer = new >>> KafkaProducer<String,Log>(senderProps); >>> ProducerRecord<String,Log> record =new >>> ProducerRecord<String,Log>("test2","1",log); >>> try { >>> producer.send(record).get(); >>> } catch (InterruptedException e) { >>> // TODO Auto-generated catch block >>> e.printStackTrace(); >>> } >>> >>> Error >>> ============== >>> org.apache.kafka.common.KafkaException: Failed to construct kafka >> producer >>> at >>> >> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441) >>> at >>> >> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:268) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) >>> at >>> >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>> at >>> >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) >>> at >>> >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>> at >>> >> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >>> at >>> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) >>> at >>> >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) >>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >>> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >>> at >>> >> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) >>> at >>> >> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) >>> at >>> >> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538) >>> at >>> >> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760) >>> at >>> >> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460) >>> at >>> >> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206) >>> Caused by: io.confluent.common.config.ConfigException: Missing required >>> configuration "schema.registry.url" which has no default value. >>> at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) >>> at >> io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78) >>> at >>> >> io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:61) >>> at >>> >> io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32) >>> at >>> >> io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) >>> at >>> >> org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60) >>> at >>> >> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:360) >>> ... 28 more >>> >> >> >
signature.asc
Description: OpenPGP digital signature