Have you tried filtering out corrupt records with something along the lines of

 df.filter(df("_corrupt_record").isNull)

On Tue, Jul 26, 2016 at 1:53 PM, vr spark <vrspark...@gmail.com> wrote:
> i am reading data from kafka using spark streaming.
>
> I am reading json and creating dataframe.
> I am using pyspark
>
> kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)
>
> lines = kvs.map(lambda x: x[1])
>
> lines.foreachRDD(mReport)
>
> def mReport(clickRDD):
>
>    clickDF = sqlContext.jsonRDD(clickRDD)
>
>    clickDF.registerTempTable("clickstream")
>
>    PagesDF = sqlContext.sql(
>
>             "SELECT   request.clientIP as ip "
>
>             "FROM clickstream "
>
>             "WHERE request.clientIP is not null "
>
>             " limit 2000 "
>
>
> The problem is that not all the jsons from the stream have the same format.
>
> It works when it reads a json which has ip.
>
> Some of the json strings do not have client ip in their schema.
>
> So i am getting error and my job is failing when it encounters such a json.
>
> How do read only those json which has ip in their schema?
>
> Please suggest.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to