Hi Aijoscha,

Wow, great illustration.

That was very clear explanation. Yes, I did enter the elements fast for
case b and I was seeing more of case As.
Also, sometimes I have seen a window getting triggered when I enter 1 or 2
elements, I believe that is expansion of case A, w.r.t to window 2.

Also can you explain me the case when using Evictor.
e.g.,


val counts = socTextStream.flatMap{_.split("\\s")}
  .map { (_, 1) }
  .keyBy(0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
  .trigger(CountTrigger.of(5))
  .evictor(CountEvictor.of(3))
  .sum(1).setParallelism(4);

counts.print()
sev.execute()

for the input


a

a

a

a

a

b

b

b

b

b

I got the output as


1> (a,3)

1> (b,3)

2> (b,3)

My assumption was that, when the Trigger is triggered, the processing will
be done on the entire items in the window,

and then 3 items will be evicted from the window, which can also be part of
the next processing of that window. But

here it looks like  the sum is calculated only on the items that were
evicted from the window.

Could you please explain what is going on here.


Thanks and Regards,
Vishnu Viswanath,
*www.vishnuviswanath.com <http://www.vishnuviswanath.com/>*
​

On Mon, Mar 14, 2016 at 5:27 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> I created a visualization to help explain the situation:
> http://s21.postimg.org/dofhcw52f/window_example.png
> The SlidingProcessingTimeWindows assigner assigns elements to windows
> based on the current processing time. The CountTrigger only fires if a
> window contains 5 elements (or more). In your test the windows for a, c and
> e fell into case b because you probably entered the letters very fast. For
> elements  b and d we have case a The elements were far enough apart or you
> happened to enter them right on a window boundary such that only one window
> contains all of them. The other windows don’t contain enough elements to
> reach 5. In my drawing window 1 contains 5 elements while window 2 only
> contains 3 of those elements.
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On 12 Mar 2016, at 19:19, Vishnu Viswanath <vishnu.viswanat...@gmail.com>
> wrote:
>
> Hi All,
>
>
> I have the below code
>
>
> val sev = StreamExecutionEnvironment.getExecutionEnvironment
> val socTextStream = sev.socketTextStream("localhost",4444)
>
> val counts = socTextStream.flatMap{_.split("\\s")}
>   .map { (_, 1) }
>   .keyBy(0)
>
> .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
>   .trigger(CountTrigger.of(5))
>   .sum(1)
>
> counts.print()
> sev.execute()
>
> I am sending messages to the port 4444 using nc -lk 4444
> This is my sample input
>
> a
> a
> a
> a
> a
> b
> b
> b
> b
> b
> c
> c
> c
> c
> c
> d
> d
> d
> d
> d
> e
> e
> e
> e
> e
>
> I am sending 5 of each letter since I have a Count Trigger of 5. I was
> expecting that for each 5 character, the code will print 5, i.e., (a,5)
> (b,5) etc. But the output I am getting is little confusing.
> Output:
>
> 1> (a,5)
> 1> (a,5)
> 1> (b,5)
> 2> (c,5)
> 2> (c,5)
> 1> (d,5)
> 1> (e,5)
> 1> (e,5)
>
> As you can see, for some character the count is printed twice(a,c,e) and
> for some characters it is printed only once (b,d). I am not able to figure
> out what is going on. I think it may have something to do with the
> SlidingProcessingTimeWindow but I am not sure.
> Can someone explain me what is going on?
>
>
> Thanks and Regards,
> Vishnu Viswanath
> www.vishnuviswanath.com
>
>
>

Reply via email to