Are you suggesting against the usage of HashSet? On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote:
> This may be because of HashSet is a mutable data structure, and it seems > you are actually mutating it in "set1 ++set2". I suggest creating a new > HashMap in the function (and add both maps into it), rather than mutating > one of them. > > On Tue, Jun 6, 2017 at 11:30 AM, SRK <swethakasire...@gmail.com> wrote: > >> Hi, >> >> I see the following error when I use ReduceByKeyAndWindow in my Spark >> Streaming app. I use reduce, invReduce and filterFunction as shown below. >> Any idea as to why I get the error? >> >> java.lang.Exception: Neither previous window has value for key, nor new >> values found. Are you sure your key class hashes consistently? >> >> >> def reduceWithHashSet: ((Long, HashSet[String]), (Long, >> HashSet[String])) >> => (Long, HashSet[String])= { >> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long, >> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1 ++set2 >> ) >> >> } >> >> def invReduceWithHashSet: ((Long, HashSet[String]), (Long, >> HashSet[String])) => (Long, HashSet[String])= { >> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long, >> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), >> set1.diff(set2)) >> } >> >> def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) => >> (Boolean)= { >> case ((metricName:String, (timeStamp: Long, set: HashSet[String]))) => >> set.size>0 >> } >> >> >> >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi >> ndow-in-Spark-Streaming-tp28748.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >