Thanks, so you were right and it is really connected to not-finishing
windows problem I've mentioned in the other post.
I don't really need parallelism of 1 for windows - I expect operation on
windows be pretty expensive and I like an idea that I can "parallelize" it.

Thanks for the explanation!

On Thu, Apr 21, 2016 at 8:06 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> no worries, I also had to read the doc to figure it out. :-)
>
> I now see what the problem is. The .countWindowAll().apply() pattern
> creates a WindowOperator with parallelism of 1 because the "count all" only
> works if one instance of the window operator sees all elements. When
> manually changing the parallelism it essentially becomes a "count per
> parallel instance" window operation and the elements form the source with
> parallelism 1 get distributed round-robin to the parallel instances of the
> count-window operator. This means, that it will take more elements emitted
> from the source before each instance of the window fires. It's a bit
> confusing but Flink does not allow forcing the parallelism to 1 right now.
>
> About using the snapshot version, I would suggest you don't use it if you
> don't absolutely need one of the features in there that is not yet
> released. The build are still pretty stable, however.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 13:53 Kostya Kulagin <kkula...@gmail.com> wrote:
>
>> First of all you are right about number of elements, my bad and sorry for
>> the confusion, I need to be better in calculations :)
>>
>> However: if I change parallelism to. lets say 2 in windowing, i.e.
>> instead of (of course I changed 29 to 30 as well :) )
>>
>> }).print();
>>
>> put
>>
>> }).setParallelism(2).print();
>>
>> at the very bottom - I am getting:
>>
>> 3> 15
>> 3> 12
>> 2> 9
>> 2> 6
>> 4> 18
>> 04/21/2016 07:47:08  Sink: Unnamed(2/4) switched to FINISHED
>> 04/21/2016 07:47:08  Source: Custom Source(1/1) switched to FINISHED
>> 04/21/2016 07:47:08  Sink: Unnamed(4/4) switched to FINISHED
>> 04/21/2016 07:47:08  Sink: Unnamed(3/4) switched to FINISHED
>> 04/21/2016 07:47:08  TriggerWindow(GlobalWindows(), 
>> PurgingTrigger(CountTrigger(10)), 
>> AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to 
>> FINISHED
>> 04/21/2016 07:47:08  TriggerWindow(GlobalWindows(), 
>> PurgingTrigger(CountTrigger(10)), 
>> AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to 
>> FINISHED
>> 1> 3
>> 1> 0
>>
>> With default setting for parallelism it works fine, same as with value 3
>> and 1.
>>
>> With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it
>> might be smth with how threads are finishing their execution?
>>
>> I am using the latest prod version I've found in maven: 1.0.1.
>> Can snapshot versions be used in prod? I mean how well tested are those?
>>
>> I will try the same on master branch later today.
>>
>> Thanks!
>> Kostya
>>
>>
>> On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> which version of Flink are you using? Maybe there is a bug. I've tested
>>> it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees
>>> of parallelism if I change the source to emit 30 elements:
>>> LongStream.range(0, 30).forEach(ctx::collect);
>>>
>>> (The second argument of LongStream.range(start, end) is exclusive)
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>
>>>
>>> On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <kkula...@gmail.com> wrote:
>>>
>>>> Actually this is not true - the source emits 30 values since it is
>>>> started with 0. If I change 29 to 33 result will be the same.
>>>> I can get all values if I play with parallelism. I.e putting parallel 1
>>>> before print.
>>>> Or if I change 29 to 39 ( I have 4 cors)
>>>> I can guess that there is smth wrong with threads. BTW in this case how
>>>> threads are created and how data flows between?
>>>> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> this is related to your other question about count windows. The source
>>>>> emits 29 values so we only have two count-windows with 10 elements each.
>>>>> The last window is never triggered.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kkula...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I think it has smth to do with parallelism and I probably do not have
>>>>>> clear understanding how parallelism works in flink but in this example:
>>>>>>
>>>>>>     StreamExecutionEnvironment env = 
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>     DataStreamSource<Long> source = env.addSource(new 
>>>>>> SourceFunction<Long>() {
>>>>>>
>>>>>>       @Override
>>>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>>>         LongStream.range(0, 29).forEach(ctx::collect);
>>>>>>       }
>>>>>>
>>>>>>       @Override
>>>>>>       public void cancel() {
>>>>>>
>>>>>>       }
>>>>>>     });
>>>>>>
>>>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, 
>>>>>> GlobalWindow>() {
>>>>>>       @Override
>>>>>>       public void apply(GlobalWindow window, Iterable<Long> values, 
>>>>>> Collector<Long> out) throws Exception {
>>>>>>         for (Long value : values) {
>>>>>>           if (value % 3 == 0) {
>>>>>>             out.collect(value);
>>>>>>           }
>>>>>>         }
>>>>>>       }
>>>>>>     }).print();
>>>>>>
>>>>>>     env.execute("yoyoyo");
>>>>>>
>>>>>> Why my output is like this:
>>>>>>
>>>>>> 4> 9
>>>>>> 1> 0
>>>>>> 1> 12
>>>>>> 3> 6
>>>>>> 3> 18
>>>>>> 2> 3
>>>>>> 2> 15
>>>>>>
>>>>>> ? I.e. where id s value of 24 for example? I expect to see it. What
>>>>>> am I doing wrong?
>>>>>>
>>>>>
>>

Reply via email to