Using Kafka Source in Akka Streams Kafka, I am trying to deserialize an 
Avro object using KafkaAvroDeserializer that is created using avro-tools 

If I use confluent Kafka Consumer, I can deserialize it like this:

def consumerProperties() = {
  val props = new Properties
  props.put("bootstrap.servers", kafkaServer)
  props.put("group.id", UUID.randomUUID().toString)
  props.put("auto.commit.enable", "false")
  props.put("auto.offset.reset", "earliest")
  props.put("schema.registry.url", schemaUrl)
  props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")

def consume(): Unit ={

  val consumer = new KafkaConsumer[String, Person](consumerProperties)


    val records : ConsumerRecords[String, Person] =consumer.poll(100)
    records.forEach(i => println(i.value().getName))

But I am unable to deserialize using Kafka source in Akka Streams Kafka.

The consumer settings is of type ConsumerSettings[String, AnyRef], and I think 
I need to somehow change it to ConsumerSettings[String, Person]

val consumerSettings  = ConsumerSettings(system,new StringDeserializer, new 
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  .withProperty("schema.registry.url", schemaUrl)
  .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")

val  subscriptions = Subscriptions.topics(topic)
val output  = Consumer.plainSource(consumerSettings,subscriptions)


