First of all you are right about number of elements, my bad and sorry for
the confusion, I need to be better in calculations :)

However: if I change parallelism to. lets say 2 in windowing, i.e. instead
of (of course I changed 29 to 30 as well :) )

}).print();

put

}).setParallelism(2).print();

at the very bottom - I am getting:

3> 15
3> 12
2> 9
2> 6
4> 18
04/21/2016 07:47:08     Sink: Unnamed(2/4) switched to FINISHED
04/21/2016 07:47:08     Source: Custom Source(1/1) switched to FINISHED
04/21/2016 07:47:08     Sink: Unnamed(4/4) switched to FINISHED
04/21/2016 07:47:08     Sink: Unnamed(3/4) switched to FINISHED
04/21/2016 07:47:08     TriggerWindow(GlobalWindows(),
PurgingTrigger(CountTrigger(10)),
AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to
FINISHED
04/21/2016 07:47:08     TriggerWindow(GlobalWindows(),
PurgingTrigger(CountTrigger(10)),
AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to
FINISHED
1> 3
1> 0

With default setting for parallelism it works fine, same as with value 3
and 1.

With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it
might be smth with how threads are finishing their execution?

I am using the latest prod version I've found in maven: 1.0.1.
Can snapshot versions be used in prod? I mean how well tested are those?

I will try the same on master branch later today.

Thanks!
Kostya


On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> which version of Flink are you using? Maybe there is a bug. I've tested it
> on the git master (1.1-SNAPSHOT) and it works fine with varying degrees of
> parallelism if I change the source to emit 30 elements:
> LongStream.range(0, 30).forEach(ctx::collect);
>
> (The second argument of LongStream.range(start, end) is exclusive)
>
> Cheers,
> Aljoscha
>
>
>
> On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <kkula...@gmail.com> wrote:
>
>> Actually this is not true - the source emits 30 values since it is
>> started with 0. If I change 29 to 33 result will be the same.
>> I can get all values if I play with parallelism. I.e putting parallel 1
>> before print.
>> Or if I change 29 to 39 ( I have 4 cors)
>> I can guess that there is smth wrong with threads. BTW in this case how
>> threads are created and how data flows between?
>> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <aljos...@apache.org> wrote:
>>
>>> Hi,
>>> this is related to your other question about count windows. The source
>>> emits 29 values so we only have two count-windows with 10 elements each.
>>> The last window is never triggered.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kkula...@gmail.com> wrote:
>>>
>>>> I think it has smth to do with parallelism and I probably do not have
>>>> clear understanding how parallelism works in flink but in this example:
>>>>
>>>>     StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>     DataStreamSource<Long> source = env.addSource(new 
>>>> SourceFunction<Long>() {
>>>>
>>>>       @Override
>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>         LongStream.range(0, 29).forEach(ctx::collect);
>>>>       }
>>>>
>>>>       @Override
>>>>       public void cancel() {
>>>>
>>>>       }
>>>>     });
>>>>
>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, 
>>>> GlobalWindow>() {
>>>>       @Override
>>>>       public void apply(GlobalWindow window, Iterable<Long> values, 
>>>> Collector<Long> out) throws Exception {
>>>>         for (Long value : values) {
>>>>           if (value % 3 == 0) {
>>>>             out.collect(value);
>>>>           }
>>>>         }
>>>>       }
>>>>     }).print();
>>>>
>>>>     env.execute("yoyoyo");
>>>>
>>>> Why my output is like this:
>>>>
>>>> 4> 9
>>>> 1> 0
>>>> 1> 12
>>>> 3> 6
>>>> 3> 18
>>>> 2> 3
>>>> 2> 15
>>>>
>>>> ? I.e. where id s value of 24 for example? I expect to see it. What am
>>>> I doing wrong?
>>>>
>>>

Reply via email to