Re: Behavior of SlidingProessingTimeWindow with CountTrigger

2016-03-14 Thread Vishnu Viswanath
Hi Aljoscha,

Thank you for the explanation and the link on IBM infosphere. That explains
whey I am seeing (a,3) and (b,3) in my example.

Yes, the name Evictor is confusing.

Thanks and Regards,
Vishnu Viswanath,
www.vishnuviswanath.com

On Mon, Mar 14, 2016 at 11:24 AM, Aljoscha Krettek 
wrote:

Hi,
> sure, the evictors are a bit confusing (especially the fact that they are
> called evictors). They should more correctly called “Keepers”. The process
> is the following:
>
> 1. Trigger Fires
> 2. Evictor decides what elements to keep, so a CountEvictor.of(3) says,
> keep only three elements, all others are evicted
> 3. Elements that remain after evictor are used for processing
>
> We mostly have Evictors for legacy reasons nowadays since the original
> window implementation was based on ideas in IBM InfoSphere streams. See
> this part of their documentation for some explanation:
> https://www.ibm.com/support/knowledgecenter/#!/SSCRJU_4.0.0/com.ibm.streams.dev.doc/doc/windowhandling.html
>
> - aljoscha
> > On 14 Mar 2016, at 17:04, Vishnu Viswanath 
> wrote:
> >
> > Hi Aijoscha,
> >
> > Wow, great illustration.
> >
> > That was very clear explanation. Yes, I did enter the elements fast for
> case b and I was seeing more of case As.
> > Also, sometimes I have seen a window getting triggered when I enter 1 or
> 2 elements, I believe that is expansion of case A, w.r.t to window 2.
> >
> > Also can you explain me the case when using Evictor.
> > e.g.,
> >
> >
> > val counts = socTextStream.flatMap{_.split("\\s")}
> >   .map { (_, 1) }
> >   .keyBy(0)
> >
>  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
> >   .trigger(CountTrigger.of(5))
> >   .evictor(CountEvictor.of(3))
> >   .sum(1).setParallelism(4);
> >
> > counts.print()
> > sev.execute()
> >
> > for the input
> >
> >
> > a
> >
> > a
> >
> > a
> >
> > a
> >
> > a
> >
> > b
> >
> > b
> >
> > b
> >
> > b
> >
> > b
> >
> > I got the output as
> >
> >
> > 1> (a,3)
> >
> > 1> (b,3)
> >
> > 2> (b,3)
> >
> > My assumption was that, when the Trigger is triggered, the processing
> will be done on the entire items in the window,
> >
> > and then 3 items will be evicted from the window, which can also be part
> of the next processing of that window. But
> >
> > here it looks like  the sum is calculated only on the items that were
> evicted from the window.
> >
> > Could you please explain what is going on here.
> >
> >
> >
> > Thanks and Regards,
> > Vishnu Viswanath,
> > www.vishnuviswanath.com
> >
> > On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek 
> wrote:
> > Hi,
> > I created a visualization to help explain the situation:
> http://s21.postimg.org/dofhcw52f/window_example.png
> > 
> > The SlidingProcessingTimeWindows assigner assigns elements to windows
> based on the current processing time. The CountTrigger only fires if a
> window contains 5 elements (or more). In your test the windows for a, c and
> e fell into case b because you probably entered the letters very fast. For
> elements  b and d we have case a The elements were far enough apart or you
> happened to enter them right on a window boundary such that only one window
> contains all of them. The other windows don’t contain enough elements to
> reach 5. In my drawing window 1 contains 5 elements while window 2 only
> contains 3 of those elements.
> >
> > I hope this helps.
> >
> > Cheers,
> > Aljoscha
> >> On 12 Mar 2016, at 19:19, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
> >>
> >> Hi All,
> >>
> >>
> >> I have the below code
> >>
> >>
> >> val sev = StreamExecutionEnvironment.getExecutionEnvironment
> >> val socTextStream = sev.socketTextStream("localhost",)
> >>
> >> val counts = socTextStream.flatMap{_.split("\\s")}
> >>   .map { (_, 1) }
> >>   .keyBy(0)
> >>
>  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
> >>   .trigger(CountTrigger.of(5))
> >>   .sum(1)
> >>
> >> counts.print()
> >> sev.execute()
> >>
> >> I am sending messages to the port  using nc -lk 
> >> This is my sample input
> >>
> >> a
> >> a
> >> a
> >> a
> >> a
> >> b
> >> b
> >> b
> >> b
> >> b
> >> c
> >> c
> >> c
> >> c
> >> c
> >> d
> >> d
> >> d
> >> d
> >> d
> >> e
> >> e
> >> e
> >> e
> >> e
> >>
> >> I am sending 5 of each letter since I have a Count Trigger of 5. I was
> expecting that for each 5 character, the code will print 5, i.e., (a,5)
> (b,5) etc. But the output I am getting is little confusing.
> >> Output:
> >>
> >> 1> (a,5)
> >> 1> (a,5)
> >> 1> (b,5)
> >> 2> (c,5)
> >> 2> (c,5)
> >> 1> (d,5)
> >> 1> (e,5)
> >> 1> (e,5)
> >>
> >> As you can see, for some character the count is printed twice(a,c,e)
> and for some characters it is printed only once (b,d). I am not able to
> figure out what is going on. I think it may have something to do with the
> SlidingProcessingTimeWindow but I am not sure.
> >> Can someone explain me what is going on?
> >>
> >>
> >> 

Re: Behavior of SlidingProessingTimeWindow with CountTrigger

2016-03-14 Thread Aljoscha Krettek
Hi,
sure, the evictors are a bit confusing (especially the fact that they are 
called evictors). They should more correctly called “Keepers”. The process is 
the following:

1. Trigger Fires
2. Evictor decides what elements to keep, so a CountEvictor.of(3) says, keep 
only three elements, all others are evicted
3. Elements that remain after evictor are used for processing

We mostly have Evictors for legacy reasons nowadays since the original window 
implementation was based on ideas in IBM InfoSphere streams. See this part of 
their documentation for some explanation: 
https://www.ibm.com/support/knowledgecenter/#!/SSCRJU_4.0.0/com.ibm.streams.dev.doc/doc/windowhandling.html

- aljoscha
> On 14 Mar 2016, at 17:04, Vishnu Viswanath  
> wrote:
> 
> Hi Aijoscha,
> 
> Wow, great illustration.
> 
> That was very clear explanation. Yes, I did enter the elements fast for case 
> b and I was seeing more of case As.
> Also, sometimes I have seen a window getting triggered when I enter 1 or 2 
> elements, I believe that is expansion of case A, w.r.t to window 2.
> 
> Also can you explain me the case when using Evictor.
> e.g.,
> 
> 
> val counts = socTextStream.flatMap{_.split("\\s")}
>   .map { (_, 1) }
>   .keyBy(0)
>   .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>   .trigger(CountTrigger.of(5))
>   .evictor(CountEvictor.of(3))
>   .sum(1).setParallelism(4);
> 
> counts.print()
> sev.execute()
> 
> for the input
> 
> 
> a
> 
> a
> 
> a
> 
> a
> 
> a
> 
> b
> 
> b
> 
> b
> 
> b
> 
> b
> 
> I got the output as
> 
> 
> 1> (a,3)
> 
> 1> (b,3)
> 
> 2> (b,3)
> 
> My assumption was that, when the Trigger is triggered, the processing will be 
> done on the entire items in the window,
> 
> and then 3 items will be evicted from the window, which can also be part of 
> the next processing of that window. But
> 
> here it looks like  the sum is calculated only on the items that were evicted 
> from the window.
> 
> Could you please explain what is going on here.
> 
> 
> 
> Thanks and Regards,
> Vishnu Viswanath,
> www.vishnuviswanath.com
> 
> On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek  wrote:
> Hi,
> I created a visualization to help explain the situation: 
> http://s21.postimg.org/dofhcw52f/window_example.png
> 
> The SlidingProcessingTimeWindows assigner assigns elements to windows based 
> on the current processing time. The CountTrigger only fires if a window 
> contains 5 elements (or more). In your test the windows for a, c and e fell 
> into case b because you probably entered the letters very fast. For elements  
> b and d we have case a The elements were far enough apart or you happened to 
> enter them right on a window boundary such that only one window contains all 
> of them. The other windows don’t contain enough elements to reach 5. In my 
> drawing window 1 contains 5 elements while window 2 only contains 3 of those 
> elements.
> 
> I hope this helps.
> 
> Cheers,
> Aljoscha
>> On 12 Mar 2016, at 19:19, Vishnu Viswanath  
>> wrote:
>> 
>> Hi All,
>> 
>> 
>> I have the below code
>> 
>> 
>> val sev = StreamExecutionEnvironment.getExecutionEnvironment
>> val socTextStream = sev.socketTextStream("localhost",)
>> 
>> val counts = socTextStream.flatMap{_.split("\\s")}
>>   .map { (_, 1) }
>>   .keyBy(0)
>>   .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>>   .trigger(CountTrigger.of(5))
>>   .sum(1)
>> 
>> counts.print()
>> sev.execute()
>> 
>> I am sending messages to the port  using nc -lk 
>> This is my sample input
>> 
>> a
>> a
>> a
>> a
>> a
>> b
>> b
>> b
>> b
>> b
>> c
>> c
>> c
>> c
>> c
>> d
>> d
>> d
>> d
>> d
>> e
>> e
>> e
>> e
>> e
>> 
>> I am sending 5 of each letter since I have a Count Trigger of 5. I was 
>> expecting that for each 5 character, the code will print 5, i.e., (a,5) 
>> (b,5) etc. But the output I am getting is little confusing.
>> Output:
>> 
>> 1> (a,5)
>> 1> (a,5)
>> 1> (b,5)
>> 2> (c,5)
>> 2> (c,5)
>> 1> (d,5)
>> 1> (e,5)
>> 1> (e,5)
>> 
>> As you can see, for some character the count is printed twice(a,c,e) and for 
>> some characters it is printed only once (b,d). I am not able to figure out 
>> what is going on. I think it may have something to do with the 
>> SlidingProcessingTimeWindow but I am not sure.
>> Can someone explain me what is going on?
>> 
>> 
>> Thanks and Regards,
>> Vishnu Viswanath
>> www.vishnuviswanath.com
>> 
> 
> 



Re: Behavior of SlidingProessingTimeWindow with CountTrigger

2016-03-14 Thread Vishnu Viswanath
Hi Aijoscha,

Wow, great illustration.

That was very clear explanation. Yes, I did enter the elements fast for
case b and I was seeing more of case As.
Also, sometimes I have seen a window getting triggered when I enter 1 or 2
elements, I believe that is expansion of case A, w.r.t to window 2.

Also can you explain me the case when using Evictor.
e.g.,


val counts = socTextStream.flatMap{_.split("\\s")}
  .map { (_, 1) }
  .keyBy(0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
  .trigger(CountTrigger.of(5))
  .evictor(CountEvictor.of(3))
  .sum(1).setParallelism(4);

counts.print()
sev.execute()

for the input


a

a

a

a

a

b

b

b

b

b

I got the output as


1> (a,3)

1> (b,3)

2> (b,3)

My assumption was that, when the Trigger is triggered, the processing will
be done on the entire items in the window,

and then 3 items will be evicted from the window, which can also be part of
the next processing of that window. But

here it looks like  the sum is calculated only on the items that were
evicted from the window.

Could you please explain what is going on here.


Thanks and Regards,
Vishnu Viswanath,
*www.vishnuviswanath.com *
​

On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek 
wrote:

> Hi,
> I created a visualization to help explain the situation:
> http://s21.postimg.org/dofhcw52f/window_example.png
> The SlidingProcessingTimeWindows assigner assigns elements to windows
> based on the current processing time. The CountTrigger only fires if a
> window contains 5 elements (or more). In your test the windows for a, c and
> e fell into case b because you probably entered the letters very fast. For
> elements  b and d we have case a The elements were far enough apart or you
> happened to enter them right on a window boundary such that only one window
> contains all of them. The other windows don’t contain enough elements to
> reach 5. In my drawing window 1 contains 5 elements while window 2 only
> contains 3 of those elements.
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On 12 Mar 2016, at 19:19, Vishnu Viswanath 
> wrote:
>
> Hi All,
>
>
> I have the below code
>
>
> val sev = StreamExecutionEnvironment.getExecutionEnvironment
> val socTextStream = sev.socketTextStream("localhost",)
>
> val counts = socTextStream.flatMap{_.split("\\s")}
>   .map { (_, 1) }
>   .keyBy(0)
>
> .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>   .trigger(CountTrigger.of(5))
>   .sum(1)
>
> counts.print()
> sev.execute()
>
> I am sending messages to the port  using nc -lk 
> This is my sample input
>
> a
> a
> a
> a
> a
> b
> b
> b
> b
> b
> c
> c
> c
> c
> c
> d
> d
> d
> d
> d
> e
> e
> e
> e
> e
>
> I am sending 5 of each letter since I have a Count Trigger of 5. I was
> expecting that for each 5 character, the code will print 5, i.e., (a,5)
> (b,5) etc. But the output I am getting is little confusing.
> Output:
>
> 1> (a,5)
> 1> (a,5)
> 1> (b,5)
> 2> (c,5)
> 2> (c,5)
> 1> (d,5)
> 1> (e,5)
> 1> (e,5)
>
> As you can see, for some character the count is printed twice(a,c,e) and
> for some characters it is printed only once (b,d). I am not able to figure
> out what is going on. I think it may have something to do with the
> SlidingProcessingTimeWindow but I am not sure.
> Can someone explain me what is going on?
>
>
> Thanks and Regards,
> Vishnu Viswanath
> www.vishnuviswanath.com
>
>
>


Re: Behavior of SlidingProessingTimeWindow with CountTrigger

2016-03-14 Thread Aljoscha Krettek
Hi,
I created a visualization to help explain the situation: 
http://s21.postimg.org/dofhcw52f/window_example.png

The SlidingProcessingTimeWindows assigner assigns elements to windows based on 
the current processing time. The CountTrigger only fires if a window contains 5 
elements (or more). In your test the windows for a, c and e fell into case b 
because you probably entered the letters very fast. For elements  b and d we 
have case a The elements were far enough apart or you happened to enter them 
right on a window boundary such that only one window contains all of them. The 
other windows don’t contain enough elements to reach 5. In my drawing window 1 
contains 5 elements while window 2 only contains 3 of those elements.

I hope this helps.

Cheers,
Aljoscha
> On 12 Mar 2016, at 19:19, Vishnu Viswanath  
> wrote:
> 
> Hi All,
> 
> 
> I have the below code
> 
> 
> val sev = StreamExecutionEnvironment.getExecutionEnvironment
> val socTextStream = sev.socketTextStream("localhost",)
> 
> val counts = socTextStream.flatMap{_.split("\\s")}
>   .map { (_, 1) }
>   .keyBy(0)
>   .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>   .trigger(CountTrigger.of(5))
>   .sum(1)
> 
> counts.print()
> sev.execute()
> 
> I am sending messages to the port  using nc -lk 
> This is my sample input
> 
> a
> a
> a
> a
> a
> b
> b
> b
> b
> b
> c
> c
> c
> c
> c
> d
> d
> d
> d
> d
> e
> e
> e
> e
> e
> 
> I am sending 5 of each letter since I have a Count Trigger of 5. I was 
> expecting that for each 5 character, the code will print 5, i.e., (a,5) (b,5) 
> etc. But the output I am getting is little confusing.
> Output:
> 
> 1> (a,5)
> 1> (a,5)
> 1> (b,5)
> 2> (c,5)
> 2> (c,5)
> 1> (d,5)
> 1> (e,5)
> 1> (e,5)
> 
> As you can see, for some character the count is printed twice(a,c,e) and for 
> some characters it is printed only once (b,d). I am not able to figure out 
> what is going on. I think it may have something to do with the 
> SlidingProcessingTimeWindow but I am not sure.
> Can someone explain me what is going on?
> 
> 
> Thanks and Regards,
> Vishnu Viswanath
> www.vishnuviswanath.com
> 



Behavior of SlidingProessingTimeWindow with CountTrigger

2016-03-12 Thread Vishnu Viswanath
Hi All,


I have the below code


val sev = StreamExecutionEnvironment.getExecutionEnvironment
val socTextStream = sev.socketTextStream("localhost",)

val counts = socTextStream.flatMap{_.split("\\s")}
  .map { (_, 1) }
  .keyBy(0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
  .trigger(CountTrigger.of(5))
  .sum(1)

counts.print()
sev.execute()

I am sending messages to the port  using nc -lk 
This is my sample input

a
a
a
a
a
b
b
b
b
b
c
c
c
c
c
d
d
d
d
d
e
e
e
e
e

I am sending 5 of each letter since I have a Count Trigger of 5. I was
expecting that for each 5 character, the code will print 5, i.e., (a,5)
(b,5) etc. But the output I am getting is little confusing.
Output:

1> (a,5)
1> (a,5)
1> (b,5)
2> (c,5)
2> (c,5)
1> (d,5)
1> (e,5)
1> (e,5)

As you can see, for some character the count is printed twice(a,c,e) and
for some characters it is printed only once (b,d). I am not able to figure
out what is going on. I think it may have something to do with the
SlidingProcessingTimeWindow but I am not sure.
Can someone explain me what is going on?


Thanks and Regards,
Vishnu Viswanath
www.vishnuviswanath.com
​