You say the data is Avro, but your Spark example specifies the key and
value types as String and uses String decoders.

If the Spark example works fine, you can just run the console consumer
without the formatter and read the strings...

On Mon, Feb 6, 2017 at 7:41 PM, kush batra <kush.ba...@gmail.com> wrote:
> Hi Team,
>
> I am trying to implement below kafka-console-consumer command(works well &
> output the intended json data) functionality in the form of program using
> spark streaming.
>
> kafka-console-consumer.sh --zookeeper host.xxxx.com:2181,host.xxxx.com:2181
> --topic mytopic --formatter CustomAvroMessageFormatter --property
> "formatter-schema-file= schema.txt" > /var/tmp/myfile.json&
>
> I am able to read message from above topic using KafkaUtils spark streaming
> programatically as below using spark scala code which works well:
>
> object ConsumeTest {
>
> def main(args: Array[String]) {
>   val sc = new SparkContext("local[*]", "ConsumeKafkaMsg")
>   sc.setLogLevel("ERROR")
>   val ssc = new StreamingContext(sc, Seconds(1))
>
>   //To read from server
>   val kafkaParams = Map("metadata.broker.list" -> "brokername:9092")
>   val topics = List("mytopic").toSet
>
>   val lines = KafkaUtils.createDirectStream[
>    String, String, StringDecoder, StringDecoder](ssc, kafkaParams,
> topics).map(_._2)
>
>   lines.print()
>
>   ssc.start()
>   ssc.awaitTermination()
>   }
>
> }
>
> However above program read message in raw format something similar to below:
>
> ��Cߣ�ߕ'윺~�_,��M˶/��Ѯ!򇜾�Vcusomtername client
> 2X3XXXXXX-sasadsad-4673-212c-dsdsadsad
> value
> ,"question"logName
> successstԇ���V
>
> against above command use custom kafka message formatter to convert raw
> format to json format using avro schema. I am unable to find out how to use
> command equivalent to message formatter in my above program which is
> important to achieve.
>
> Below is the probable avro schema(schema.txt) for reference(actually v
> complex what is available to process):
>
> {
>   "type" : "record",
>   "namespace" : "mynamespace",
>   "name" : "myname",
>   "fields" : [{
>     "name":"field1",
>     "type":{
>       "type":"record",
>       "name":"Eventfield1",
>       "fields":[{.....}]
>     }]
>   ]
> }
>
> Please help to implement the same.
>
> Regards,
> Kush



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog

Reply via email to