Have you tried filtering out corrupt records with something along the lines of
df.filter(df("_corrupt_record").isNull) On Tue, Jul 26, 2016 at 1:53 PM, vr spark <vrspark...@gmail.com> wrote: > i am reading data from kafka using spark streaming. > > I am reading json and creating dataframe. > I am using pyspark > > kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams) > > lines = kvs.map(lambda x: x[1]) > > lines.foreachRDD(mReport) > > def mReport(clickRDD): > > clickDF = sqlContext.jsonRDD(clickRDD) > > clickDF.registerTempTable("clickstream") > > PagesDF = sqlContext.sql( > > "SELECT request.clientIP as ip " > > "FROM clickstream " > > "WHERE request.clientIP is not null " > > " limit 2000 " > > > The problem is that not all the jsons from the stream have the same format. > > It works when it reads a json which has ip. > > Some of the json strings do not have client ip in their schema. > > So i am getting error and my job is failing when it encounters such a json. > > How do read only those json which has ip in their schema? > > Please suggest. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org