Thanks, so you were right and it is really connected to not-finishing windows problem I've mentioned in the other post. I don't really need parallelism of 1 for windows - I expect operation on windows be pretty expensive and I like an idea that I can "parallelize" it.
Thanks for the explanation! On Thu, Apr 21, 2016 at 8:06 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > no worries, I also had to read the doc to figure it out. :-) > > I now see what the problem is. The .countWindowAll().apply() pattern > creates a WindowOperator with parallelism of 1 because the "count all" only > works if one instance of the window operator sees all elements. When > manually changing the parallelism it essentially becomes a "count per > parallel instance" window operation and the elements form the source with > parallelism 1 get distributed round-robin to the parallel instances of the > count-window operator. This means, that it will take more elements emitted > from the source before each instance of the window fires. It's a bit > confusing but Flink does not allow forcing the parallelism to 1 right now. > > About using the snapshot version, I would suggest you don't use it if you > don't absolutely need one of the features in there that is not yet > released. The build are still pretty stable, however. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 13:53 Kostya Kulagin <kkula...@gmail.com> wrote: > >> First of all you are right about number of elements, my bad and sorry for >> the confusion, I need to be better in calculations :) >> >> However: if I change parallelism to. lets say 2 in windowing, i.e. >> instead of (of course I changed 29 to 30 as well :) ) >> >> }).print(); >> >> put >> >> }).setParallelism(2).print(); >> >> at the very bottom - I am getting: >> >> 3> 15 >> 3> 12 >> 2> 9 >> 2> 6 >> 4> 18 >> 04/21/2016 07:47:08 Sink: Unnamed(2/4) switched to FINISHED >> 04/21/2016 07:47:08 Source: Custom Source(1/1) switched to FINISHED >> 04/21/2016 07:47:08 Sink: Unnamed(4/4) switched to FINISHED >> 04/21/2016 07:47:08 Sink: Unnamed(3/4) switched to FINISHED >> 04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), >> PurgingTrigger(CountTrigger(10)), >> AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to >> FINISHED >> 04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), >> PurgingTrigger(CountTrigger(10)), >> AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to >> FINISHED >> 1> 3 >> 1> 0 >> >> With default setting for parallelism it works fine, same as with value 3 >> and 1. >> >> With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it >> might be smth with how threads are finishing their execution? >> >> I am using the latest prod version I've found in maven: 1.0.1. >> Can snapshot versions be used in prod? I mean how well tested are those? >> >> I will try the same on master branch later today. >> >> Thanks! >> Kostya >> >> >> On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> which version of Flink are you using? Maybe there is a bug. I've tested >>> it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees >>> of parallelism if I change the source to emit 30 elements: >>> LongStream.range(0, 30).forEach(ctx::collect); >>> >>> (The second argument of LongStream.range(start, end) is exclusive) >>> >>> Cheers, >>> Aljoscha >>> >>> >>> >>> On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <kkula...@gmail.com> wrote: >>> >>>> Actually this is not true - the source emits 30 values since it is >>>> started with 0. If I change 29 to 33 result will be the same. >>>> I can get all values if I play with parallelism. I.e putting parallel 1 >>>> before print. >>>> Or if I change 29 to 39 ( I have 4 cors) >>>> I can guess that there is smth wrong with threads. BTW in this case how >>>> threads are created and how data flows between? >>>> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> this is related to your other question about count windows. The source >>>>> emits 29 values so we only have two count-windows with 10 elements each. >>>>> The last window is never triggered. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kkula...@gmail.com> >>>>> wrote: >>>>> >>>>>> I think it has smth to do with parallelism and I probably do not have >>>>>> clear understanding how parallelism works in flink but in this example: >>>>>> >>>>>> StreamExecutionEnvironment env = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>> DataStreamSource<Long> source = env.addSource(new >>>>>> SourceFunction<Long>() { >>>>>> >>>>>> @Override >>>>>> public void run(SourceContext<Long> ctx) throws Exception { >>>>>> LongStream.range(0, 29).forEach(ctx::collect); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void cancel() { >>>>>> >>>>>> } >>>>>> }); >>>>>> >>>>>> source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, >>>>>> GlobalWindow>() { >>>>>> @Override >>>>>> public void apply(GlobalWindow window, Iterable<Long> values, >>>>>> Collector<Long> out) throws Exception { >>>>>> for (Long value : values) { >>>>>> if (value % 3 == 0) { >>>>>> out.collect(value); >>>>>> } >>>>>> } >>>>>> } >>>>>> }).print(); >>>>>> >>>>>> env.execute("yoyoyo"); >>>>>> >>>>>> Why my output is like this: >>>>>> >>>>>> 4> 9 >>>>>> 1> 0 >>>>>> 1> 12 >>>>>> 3> 6 >>>>>> 3> 18 >>>>>> 2> 3 >>>>>> 2> 15 >>>>>> >>>>>> ? I.e. where id s value of 24 for example? I expect to see it. What >>>>>> am I doing wrong? >>>>>> >>>>> >>