Sure thing, I'll see if I can isolate this. Regards.
James On 4 March 2016 at 12:24, Ted Yu <yuzhih...@gmail.com> wrote: > If you can reproduce the following with a unit test, I suggest you open a > JIRA. > > Thanks > > On Mar 4, 2016, at 4:01 AM, James Hammerton <ja...@gluru.co> wrote: > > Hi, > > I've come across some strange behaviour with Spark 1.6.0. > > In the code below, the filtering by "eventName" only seems to work if I > called .cache on the resulting DataFrame. > > If I don't do this, the code crashes inside the UDF because it processes > an event that the filter should get rid off. > > Any ideas why this might be the case? > > The code is as follows: > >> val df = sqlContext.read.parquet(inputPath) >> val filtered = df.filter(df("eventName").equalTo(Created)) >> val extracted = extractEmailReferences(sqlContext, filtered.cache) >> // Caching seems to be required for the filter to work >> extracted.write.parquet(outputPath) > > > where extractEmailReferences does this: > >> > > def extractEmailReferences(sqlContext: SQLContext, df: DataFrame): >> DataFrame = { > > val extracted = df.select(df(EventFieldNames.ObjectId), > > extractReferencesUDF(df(EventFieldNames.EventJson), >> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references") > > >> extracted.filter(extracted("references").notEqual("UNKNOWN")) > > } > > > and extractReferencesUDF: > >> def extractReferencesUDF = udf(extractReferences(_: String, _: String, _: >> String)) > > def extractReferences(eventJson: String, objectId: String, userId: >> String): String = { >> import org.json4s.jackson.Serialization >> import org.json4s.NoTypeHints >> implicit val formats = Serialization.formats(NoTypeHints) >> >> val created = Serialization.read[GMailMessage.Created](eventJson) // >> This is where the code crashes if the .cache isn't called > > > Regards, > > James > >