Hi Aljoscha, Thank you for the explanation and the link on IBM infosphere. That explains whey I am seeing (a,3) and (b,3) in my example.
Yes, the name Evictor is confusing. Thanks and Regards, Vishnu Viswanath, www.vishnuviswanath.com On Mon, Mar 14, 2016 at 11:24 AM, Aljoscha Krettek <aljos...@apache.org> wrote: Hi, > sure, the evictors are a bit confusing (especially the fact that they are > called evictors). They should more correctly called “Keepers”. The process > is the following: > > 1. Trigger Fires > 2. Evictor decides what elements to keep, so a CountEvictor.of(3) says, > keep only three elements, all others are evicted > 3. Elements that remain after evictor are used for processing > > We mostly have Evictors for legacy reasons nowadays since the original > window implementation was based on ideas in IBM InfoSphere streams. See > this part of their documentation for some explanation: > https://www.ibm.com/support/knowledgecenter/#!/SSCRJU_4.0.0/com.ibm.streams.dev.doc/doc/windowhandling.html > > - aljoscha > > On 14 Mar 2016, at 17:04, Vishnu Viswanath <vishnu.viswanat...@gmail.com> > wrote: > > > > Hi Aijoscha, > > > > Wow, great illustration. > > > > That was very clear explanation. Yes, I did enter the elements fast for > case b and I was seeing more of case As. > > Also, sometimes I have seen a window getting triggered when I enter 1 or > 2 elements, I believe that is expansion of case A, w.r.t to window 2. > > > > Also can you explain me the case when using Evictor. > > e.g., > > > > > > val counts = socTextStream.flatMap{_.split("\\s")} > > .map { (_, 1) } > > .keyBy(0) > > > .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10))) > > .trigger(CountTrigger.of(5)) > > .evictor(CountEvictor.of(3)) > > .sum(1).setParallelism(4); > > > > counts.print() > > sev.execute() > > > > for the input > > > > > > a > > > > a > > > > a > > > > a > > > > a > > > > b > > > > b > > > > b > > > > b > > > > b > > > > I got the output as > > > > > > 1> (a,3) > > > > 1> (b,3) > > > > 2> (b,3) > > > > My assumption was that, when the Trigger is triggered, the processing > will be done on the entire items in the window, > > > > and then 3 items will be evicted from the window, which can also be part > of the next processing of that window. But > > > > here it looks like the sum is calculated only on the items that were > evicted from the window. > > > > Could you please explain what is going on here. > > > > > > > > Thanks and Regards, > > Vishnu Viswanath, > > www.vishnuviswanath.com > > > > On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > Hi, > > I created a visualization to help explain the situation: > http://s21.postimg.org/dofhcw52f/window_example.png > > <window example.png> > > The SlidingProcessingTimeWindows assigner assigns elements to windows > based on the current processing time. The CountTrigger only fires if a > window contains 5 elements (or more). In your test the windows for a, c and > e fell into case b because you probably entered the letters very fast. For > elements b and d we have case a The elements were far enough apart or you > happened to enter them right on a window boundary such that only one window > contains all of them. The other windows don’t contain enough elements to > reach 5. In my drawing window 1 contains 5 elements while window 2 only > contains 3 of those elements. > > > > I hope this helps. > > > > Cheers, > > Aljoscha > >> On 12 Mar 2016, at 19:19, Vishnu Viswanath < > vishnu.viswanat...@gmail.com> wrote: > >> > >> Hi All, > >> > >> > >> I have the below code > >> > >> > >> val sev = StreamExecutionEnvironment.getExecutionEnvironment > >> val socTextStream = sev.socketTextStream("localhost",4444) > >> > >> val counts = socTextStream.flatMap{_.split("\\s")} > >> .map { (_, 1) } > >> .keyBy(0) > >> > .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10))) > >> .trigger(CountTrigger.of(5)) > >> .sum(1) > >> > >> counts.print() > >> sev.execute() > >> > >> I am sending messages to the port 4444 using nc -lk 4444 > >> This is my sample input > >> > >> a > >> a > >> a > >> a > >> a > >> b > >> b > >> b > >> b > >> b > >> c > >> c > >> c > >> c > >> c > >> d > >> d > >> d > >> d > >> d > >> e > >> e > >> e > >> e > >> e > >> > >> I am sending 5 of each letter since I have a Count Trigger of 5. I was > expecting that for each 5 character, the code will print 5, i.e., (a,5) > (b,5) etc. But the output I am getting is little confusing. > >> Output: > >> > >> 1> (a,5) > >> 1> (a,5) > >> 1> (b,5) > >> 2> (c,5) > >> 2> (c,5) > >> 1> (d,5) > >> 1> (e,5) > >> 1> (e,5) > >> > >> As you can see, for some character the count is printed twice(a,c,e) > and for some characters it is printed only once (b,d). I am not able to > figure out what is going on. I think it may have something to do with the > SlidingProcessingTimeWindow but I am not sure. > >> Can someone explain me what is going on? > >> > >> > >> Thanks and Regards, > >> Vishnu Viswanath > >> www.vishnuviswanath.com > >> > > > > >