HI ,
I tried and getting exception still..any other suggestion?

clickDF = cDF.filter(cDF['request.clientIP'].isNotNull())

It fails for some cases and errors our with below message

AnalysisException: u'No such struct field clientIP in cookies,
nscClientIP1, nscClientIP2, uAgent;'

On Tue, Jul 26, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Have you tried filtering out corrupt records with something along the
> lines of
>
>  df.filter(df("_corrupt_record").isNull)
>
> On Tue, Jul 26, 2016 at 1:53 PM, vr spark <vrspark...@gmail.com> wrote:
> > i am reading data from kafka using spark streaming.
> >
> > I am reading json and creating dataframe.
> > I am using pyspark
> >
> > kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)
> >
> > lines = kvs.map(lambda x: x[1])
> >
> > lines.foreachRDD(mReport)
> >
> > def mReport(clickRDD):
> >
> >    clickDF = sqlContext.jsonRDD(clickRDD)
> >
> >    clickDF.registerTempTable("clickstream")
> >
> >    PagesDF = sqlContext.sql(
> >
> >             "SELECT   request.clientIP as ip "
> >
> >             "FROM clickstream "
> >
> >             "WHERE request.clientIP is not null "
> >
> >             " limit 2000 "
> >
> >
> > The problem is that not all the jsons from the stream have the same
> format.
> >
> > It works when it reads a json which has ip.
> >
> > Some of the json strings do not have client ip in their schema.
> >
> > So i am getting error and my job is failing when it encounters such a
> json.
> >
> > How do read only those json which has ip in their schema?
> >
> > Please suggest.
>

Reply via email to