HI , I tried and getting exception still..any other suggestion? clickDF = cDF.filter(cDF['request.clientIP'].isNotNull())
It fails for some cases and errors our with below message AnalysisException: u'No such struct field clientIP in cookies, nscClientIP1, nscClientIP2, uAgent;' On Tue, Jul 26, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote: > Have you tried filtering out corrupt records with something along the > lines of > > df.filter(df("_corrupt_record").isNull) > > On Tue, Jul 26, 2016 at 1:53 PM, vr spark <vrspark...@gmail.com> wrote: > > i am reading data from kafka using spark streaming. > > > > I am reading json and creating dataframe. > > I am using pyspark > > > > kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams) > > > > lines = kvs.map(lambda x: x[1]) > > > > lines.foreachRDD(mReport) > > > > def mReport(clickRDD): > > > > clickDF = sqlContext.jsonRDD(clickRDD) > > > > clickDF.registerTempTable("clickstream") > > > > PagesDF = sqlContext.sql( > > > > "SELECT request.clientIP as ip " > > > > "FROM clickstream " > > > > "WHERE request.clientIP is not null " > > > > " limit 2000 " > > > > > > The problem is that not all the jsons from the stream have the same > format. > > > > It works when it reads a json which has ip. > > > > Some of the json strings do not have client ip in their schema. > > > > So i am getting error and my job is failing when it encounters such a > json. > > > > How do read only those json which has ip in their schema? > > > > Please suggest. >