No, I literally meant filter on _corrupt_record, which has a magic
meaning in dataframe api to identify lines that didn't match the
schema.
On Wed, Jul 27, 2016 at 12:19 PM, vr spark wrote:
> HI ,
> I tried and getting exception still..any other suggestion?
>
> clickDF =
HI ,
I tried and getting exception still..any other suggestion?
clickDF = cDF.filter(cDF['request.clientIP'].isNotNull())
It fails for some cases and errors our with below message
AnalysisException: u'No such struct field clientIP in cookies,
nscClientIP1, nscClientIP2, uAgent;'
On Tue, Jul
Have you tried filtering out corrupt records with something along the lines of
df.filter(df("_corrupt_record").isNull)
On Tue, Jul 26, 2016 at 1:53 PM, vr spark wrote:
> i am reading data from kafka using spark streaming.
>
> I am reading json and creating dataframe.
> I
i am reading data from kafka using spark streaming.
I am reading json and creating dataframe.
I am using pyspark
kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)
lines = kvs.map(lambda x: x[1])
lines.foreachRDD(mReport)
def mReport(clickRDD):
clickDF =
i am reading data from kafka using spark streaming.
I am reading json and creating dataframe.
kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)
lines = kvs.map(lambda x: x[1])
lines.foreachRDD(mReport)
def mReport(clickRDD):
clickDF = sqlContext.jsonRDD(clickRDD)