Sure thing, I'll see if I can isolate this.

Regards.

James

On 4 March 2016 at 12:24, Ted Yu <yuzhih...@gmail.com> wrote:

> If you can reproduce the following with a unit test, I suggest you open a
> JIRA.
>
> Thanks
>
> On Mar 4, 2016, at 4:01 AM, James Hammerton <ja...@gluru.co> wrote:
>
> Hi,
>
> I've come across some strange behaviour with Spark 1.6.0.
>
> In the code below, the filtering by "eventName" only seems to work if I
> called .cache on the resulting DataFrame.
>
> If I don't do this, the code crashes inside the UDF because it processes
> an event that the filter should get rid off.
>
> Any ideas why this might be the case?
>
> The code is as follows:
>
>>       val df = sqlContext.read.parquet(inputPath)
>>       val filtered = df.filter(df("eventName").equalTo(Created))
>>       val extracted = extractEmailReferences(sqlContext, filtered.cache)
>> // Caching seems to be required for the filter to work
>>       extracted.write.parquet(outputPath)
>
>
> where extractEmailReferences does this:
>
>>
>
> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
>> DataFrame = {
>
>     val extracted = df.select(df(EventFieldNames.ObjectId),
>
>       extractReferencesUDF(df(EventFieldNames.EventJson),
>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")
>
>
>>     extracted.filter(extracted("references").notEqual("UNKNOWN"))
>
>   }
>
>
> and extractReferencesUDF:
>
>> def extractReferencesUDF = udf(extractReferences(_: String, _: String, _:
>> String))
>
> def extractReferences(eventJson: String, objectId: String, userId:
>> String): String = {
>>     import org.json4s.jackson.Serialization
>>     import org.json4s.NoTypeHints
>>     implicit val formats = Serialization.formats(NoTypeHints)
>>
>>     val created = Serialization.read[GMailMessage.Created](eventJson) //
>> This is where the code crashes if the .cache isn't called
>
>
>  Regards,
>
> James
>
>

Reply via email to