Hi TD,

I am still seeing this issue with any immuatble DataStructure. Any idea why
this happens? I use scala.collection.immutable.List[String])  and my reduce
and inverse reduce does the following.

visitorSet1 ++visitorSet2



visitorSet1.filterNot(visitorSet2.contains(_)



On Wed, Jun 7, 2017 at 8:43 AM, swetha kasireddy <swethakasire...@gmail.com>
wrote:

> I changed the datastructure to scala.collection.immutable.Set and I still
> see the same issue. My key is a String.  I do the following in my reduce
> and invReduce.
>
> visitorSet1 ++visitorSet2.toTraversable
>
>
> visitorSet1 --visitorSet2.toTraversable
>
> On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <tathagata.das1...@gmail.com
> > wrote:
>
>> Yes, and in general any mutable data structure. You have to immutable
>> data structures whose hashcode and equals is consistent enough for being
>> put in a set.
>>
>> On Jun 6, 2017 4:50 PM, "swetha kasireddy" <swethakasire...@gmail.com>
>> wrote:
>>
>>> Are you suggesting against the usage of HashSet?
>>>
>>> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> This may be because of HashSet is a mutable data structure, and it
>>>> seems you are actually mutating it in "set1 ++set2". I suggest creating a
>>>> new HashMap in the function (and add both maps into it), rather than
>>>> mutating one of them.
>>>>
>>>> On Tue, Jun 6, 2017 at 11:30 AM, SRK <swethakasire...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>>>>> Streaming app. I use reduce, invReduce and filterFunction as shown
>>>>> below.
>>>>> Any idea as to why I get the error?
>>>>>
>>>>>  java.lang.Exception: Neither previous window has value for key, nor
>>>>> new
>>>>> values found. Are you sure your key class hashes consistently?
>>>>>
>>>>>
>>>>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>> HashSet[String]))
>>>>> => (Long, HashSet[String])= {
>>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>>>>> ++set2 )
>>>>>
>>>>>   }
>>>>>
>>>>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>> HashSet[String])) => (Long, HashSet[String])= {
>>>>>     case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>>>>> set1.diff(set2))
>>>>>   }
>>>>>
>>>>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>>>>> (Boolean)= {
>>>>>     case ((metricName:String, (timeStamp: Long, set:
>>>>> HashSet[String]))) =>
>>>>> set.size>0
>>>>>   }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: http://apache-spark-user-list.
>>>>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>>>>> ndow-in-Spark-Streaming-tp28748.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>

Reply via email to