If you can reproduce the following with a unit test, I suggest you open a JIRA. 

Thanks

> On Mar 4, 2016, at 4:01 AM, James Hammerton <ja...@gluru.co> wrote:
> 
> Hi,
> 
> I've come across some strange behaviour with Spark 1.6.0. 
> 
> In the code below, the filtering by "eventName" only seems to work if I 
> called .cache on the resulting DataFrame. 
> 
> If I don't do this, the code crashes inside the UDF because it processes an 
> event that the filter should get rid off.
> 
> Any ideas why this might be the case?
> 
> The code is as follows:
>>       val df = sqlContext.read.parquet(inputPath)
>>       val filtered = df.filter(df("eventName").equalTo(Created))
>>       val extracted = extractEmailReferences(sqlContext, filtered.cache) // 
>> Caching seems to be required for the filter to work
>>       extracted.write.parquet(outputPath)
> 
> where extractEmailReferences does this:
>>  
>> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame): DataFrame 
>> = {
>>     val extracted = df.select(df(EventFieldNames.ObjectId),
>>       extractReferencesUDF(df(EventFieldNames.EventJson), 
>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")
>> 
>>     extracted.filter(extracted("references").notEqual("UNKNOWN"))
>>   }
>  
> and extractReferencesUDF:
>> def extractReferencesUDF = udf(extractReferences(_: String, _: String, _: 
>> String)) 
>> def extractReferences(eventJson: String, objectId: String, userId: String): 
>> String = {
>>     import org.json4s.jackson.Serialization
>>     import org.json4s.NoTypeHints
>>     implicit val formats = Serialization.formats(NoTypeHints)
>> 
>>     val created = Serialization.read[GMailMessage.Created](eventJson) // 
>> This is where the code crashes if the .cache isn't called
> 
>  Regards,
> 
> James

Reply via email to