i am reading data from kafka using spark streaming. I am reading json and creating dataframe. I am using pyspark
kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams) lines = kvs.map(lambda x: x[1]) lines.foreachRDD(mReport) def mReport(clickRDD): clickDF = sqlContext.jsonRDD(clickRDD) clickDF.registerTempTable("clickstream") PagesDF = sqlContext.sql( "SELECT request.clientIP as ip " "FROM clickstream " "WHERE request.clientIP is not null " " limit 2000 " The problem is that not all the jsons from the stream have the same format. It works when it reads a json which has ip. Some of the json strings do not have client ip in their schema. So i am getting error and my job is failing when it encounters such a json. How do read only those json which has ip in their schema? Please suggest.