i am reading data from kafka using spark streaming.

I am reading json and creating dataframe.
I am using pyspark

kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)

lines = kvs.map(lambda x: x[1])


def mReport(clickRDD):

   clickDF = sqlContext.jsonRDD(clickRDD)


   PagesDF = sqlContext.sql(

            "SELECT   request.clientIP as ip "

            "FROM clickstream "

            "WHERE request.clientIP is not null "

            " limit 2000 "

The problem is that not all the jsons from the stream have the same format.

It works when it reads a json which has ip.

Some of the json strings do not have client ip in their schema.

So i am getting error and my job is failing when it encounters such a json.

How do read only those json which has ip in their schema?

Please suggest.

