Re: Values are missing, probably due parallelism?
Thanks, so you were right and it is really connected to not-finishing windows problem I've mentioned in the other post. I don't really need parallelism of 1 for windows - I expect operation on windows be pretty expensive and I like an idea that I can "parallelize" it. Thanks for the explanation! On Thu, Apr 21, 2016 at 8:06 AM, Aljoscha Krettek wrote: > Hi, > no worries, I also had to read the doc to figure it out. :-) > > I now see what the problem is. The .countWindowAll().apply() pattern > creates a WindowOperator with parallelism of 1 because the "count all" only > works if one instance of the window operator sees all elements. When > manually changing the parallelism it essentially becomes a "count per > parallel instance" window operation and the elements form the source with > parallelism 1 get distributed round-robin to the parallel instances of the > count-window operator. This means, that it will take more elements emitted > from the source before each instance of the window fires. It's a bit > confusing but Flink does not allow forcing the parallelism to 1 right now. > > About using the snapshot version, I would suggest you don't use it if you > don't absolutely need one of the features in there that is not yet > released. The build are still pretty stable, however. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 13:53 Kostya Kulagin wrote: > >> First of all you are right about number of elements, my bad and sorry for >> the confusion, I need to be better in calculations :) >> >> However: if I change parallelism to. lets say 2 in windowing, i.e. >> instead of (of course I changed 29 to 30 as well :) ) >> >> }).print(); >> >> put >> >> }).setParallelism(2).print(); >> >> at the very bottom - I am getting: >> >> 3> 15 >> 3> 12 >> 2> 9 >> 2> 6 >> 4> 18 >> 04/21/2016 07:47:08 Sink: Unnamed(2/4) switched to FINISHED >> 04/21/2016 07:47:08 Source: Custom Source(1/1) switched to FINISHED >> 04/21/2016 07:47:08 Sink: Unnamed(4/4) switched to FINISHED >> 04/21/2016 07:47:08 Sink: Unnamed(3/4) switched to FINISHED >> 04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), >> PurgingTrigger(CountTrigger(10)), >> AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to >> FINISHED >> 04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), >> PurgingTrigger(CountTrigger(10)), >> AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to >> FINISHED >> 1> 3 >> 1> 0 >> >> With default setting for parallelism it works fine, same as with value 3 >> and 1. >> >> With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it >> might be smth with how threads are finishing their execution? >> >> I am using the latest prod version I've found in maven: 1.0.1. >> Can snapshot versions be used in prod? I mean how well tested are those? >> >> I will try the same on master branch later today. >> >> Thanks! >> Kostya >> >> >> On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek >> wrote: >> >>> Hi, >>> which version of Flink are you using? Maybe there is a bug. I've tested >>> it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees >>> of parallelism if I change the source to emit 30 elements: >>> LongStream.range(0, 30).forEach(ctx::collect); >>> >>> (The second argument of LongStream.range(start, end) is exclusive) >>> >>> Cheers, >>> Aljoscha >>> >>> >>> >>> On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin wrote: >>> Actually this is not true - the source emits 30 values since it is started with 0. If I change 29 to 33 result will be the same. I can get all values if I play with parallelism. I.e putting parallel 1 before print. Or if I change 29 to 39 ( I have 4 cors) I can guess that there is smth wrong with threads. BTW in this case how threads are created and how data flows between? On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" wrote: > Hi, > this is related to your other question about count windows. The source > emits 29 values so we only have two count-windows with 10 elements each. > The last window is never triggered. > > Cheers, > Aljoscha > > On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin > wrote: > >> I think it has smth to do with parallelism and I probably do not have >> clear understanding how parallelism works in flink but in this example: >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStreamSource source = env.addSource(new >> SourceFunction() { >> >> @Override >> public void run(SourceContext ctx) throws Exception { >> LongStream.range(0, 29).forEach(ctx::collect); >> } >> >> @Override >> public void cancel() { >> >> } >> }); >> >> source.countWindowAll(10).apply(new AllWindowFunction> GlobalWindow>() { >> @Override >> public void apply(Globa
Re: Values are missing, probably due parallelism?
Hi, no worries, I also had to read the doc to figure it out. :-) I now see what the problem is. The .countWindowAll().apply() pattern creates a WindowOperator with parallelism of 1 because the "count all" only works if one instance of the window operator sees all elements. When manually changing the parallelism it essentially becomes a "count per parallel instance" window operation and the elements form the source with parallelism 1 get distributed round-robin to the parallel instances of the count-window operator. This means, that it will take more elements emitted from the source before each instance of the window fires. It's a bit confusing but Flink does not allow forcing the parallelism to 1 right now. About using the snapshot version, I would suggest you don't use it if you don't absolutely need one of the features in there that is not yet released. The build are still pretty stable, however. Cheers, Aljoscha On Thu, 21 Apr 2016 at 13:53 Kostya Kulagin wrote: > First of all you are right about number of elements, my bad and sorry for > the confusion, I need to be better in calculations :) > > However: if I change parallelism to. lets say 2 in windowing, i.e. instead > of (of course I changed 29 to 30 as well :) ) > > }).print(); > > put > > }).setParallelism(2).print(); > > at the very bottom - I am getting: > > 3> 15 > 3> 12 > 2> 9 > 2> 6 > 4> 18 > 04/21/2016 07:47:08 Sink: Unnamed(2/4) switched to FINISHED > 04/21/2016 07:47:08 Source: Custom Source(1/1) switched to FINISHED > 04/21/2016 07:47:08 Sink: Unnamed(4/4) switched to FINISHED > 04/21/2016 07:47:08 Sink: Unnamed(3/4) switched to FINISHED > 04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), > PurgingTrigger(CountTrigger(10)), > AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to FINISHED > 04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), > PurgingTrigger(CountTrigger(10)), > AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to FINISHED > 1> 3 > 1> 0 > > With default setting for parallelism it works fine, same as with value 3 > and 1. > > With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it > might be smth with how threads are finishing their execution? > > I am using the latest prod version I've found in maven: 1.0.1. > Can snapshot versions be used in prod? I mean how well tested are those? > > I will try the same on master branch later today. > > Thanks! > Kostya > > > On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek > wrote: > >> Hi, >> which version of Flink are you using? Maybe there is a bug. I've tested >> it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees >> of parallelism if I change the source to emit 30 elements: >> LongStream.range(0, 30).forEach(ctx::collect); >> >> (The second argument of LongStream.range(start, end) is exclusive) >> >> Cheers, >> Aljoscha >> >> >> >> On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin wrote: >> >>> Actually this is not true - the source emits 30 values since it is >>> started with 0. If I change 29 to 33 result will be the same. >>> I can get all values if I play with parallelism. I.e putting parallel 1 >>> before print. >>> Or if I change 29 to 39 ( I have 4 cors) >>> I can guess that there is smth wrong with threads. BTW in this case how >>> threads are created and how data flows between? >>> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" wrote: >>> Hi, this is related to your other question about count windows. The source emits 29 values so we only have two count-windows with 10 elements each. The last window is never triggered. Cheers, Aljoscha On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin wrote: > I think it has smth to do with parallelism and I probably do not have > clear understanding how parallelism works in flink but in this example: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStreamSource source = env.addSource(new > SourceFunction() { > > @Override > public void run(SourceContext ctx) throws Exception { > LongStream.range(0, 29).forEach(ctx::collect); > } > > @Override > public void cancel() { > > } > }); > > source.countWindowAll(10).apply(new AllWindowFunction GlobalWindow>() { > @Override > public void apply(GlobalWindow window, Iterable values, > Collector out) throws Exception { > for (Long value : values) { > if (value % 3 == 0) { > out.collect(value); > } > } > } > }).print(); > > env.execute("yoyoyo"); > > Why my output is like this: > > 4> 9 > 1> 0 > 1> 12 > 3> 6 > 3> 18 > 2> 3 > 2> 15 > > ? I.e. where id s value of 24 for example? I expect to see it. What am > I
Re: Values are missing, probably due parallelism?
First of all you are right about number of elements, my bad and sorry for the confusion, I need to be better in calculations :) However: if I change parallelism to. lets say 2 in windowing, i.e. instead of (of course I changed 29 to 30 as well :) ) }).print(); put }).setParallelism(2).print(); at the very bottom - I am getting: 3> 15 3> 12 2> 9 2> 6 4> 18 04/21/2016 07:47:08 Sink: Unnamed(2/4) switched to FINISHED 04/21/2016 07:47:08 Source: Custom Source(1/1) switched to FINISHED 04/21/2016 07:47:08 Sink: Unnamed(4/4) switched to FINISHED 04/21/2016 07:47:08 Sink: Unnamed(3/4) switched to FINISHED 04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(2/2) switched to FINISHED 04/21/2016 07:47:08 TriggerWindow(GlobalWindows(), PurgingTrigger(CountTrigger(10)), AllWindowedStream.apply(AllWindowedStream.java:230))(1/2) switched to FINISHED 1> 3 1> 0 With default setting for parallelism it works fine, same as with value 3 and 1. With 2, 4+ it does not work. With 4+ it simply prints nothing. I.e. it might be smth with how threads are finishing their execution? I am using the latest prod version I've found in maven: 1.0.1. Can snapshot versions be used in prod? I mean how well tested are those? I will try the same on master branch later today. Thanks! Kostya On Thu, Apr 21, 2016 at 6:38 AM, Aljoscha Krettek wrote: > Hi, > which version of Flink are you using? Maybe there is a bug. I've tested it > on the git master (1.1-SNAPSHOT) and it works fine with varying degrees of > parallelism if I change the source to emit 30 elements: > LongStream.range(0, 30).forEach(ctx::collect); > > (The second argument of LongStream.range(start, end) is exclusive) > > Cheers, > Aljoscha > > > > On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin wrote: > >> Actually this is not true - the source emits 30 values since it is >> started with 0. If I change 29 to 33 result will be the same. >> I can get all values if I play with parallelism. I.e putting parallel 1 >> before print. >> Or if I change 29 to 39 ( I have 4 cors) >> I can guess that there is smth wrong with threads. BTW in this case how >> threads are created and how data flows between? >> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" wrote: >> >>> Hi, >>> this is related to your other question about count windows. The source >>> emits 29 values so we only have two count-windows with 10 elements each. >>> The last window is never triggered. >>> >>> Cheers, >>> Aljoscha >>> >>> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin wrote: >>> I think it has smth to do with parallelism and I probably do not have clear understanding how parallelism works in flink but in this example: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource source = env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { LongStream.range(0, 29).forEach(ctx::collect); } @Override public void cancel() { } }); source.countWindowAll(10).apply(new AllWindowFunction>>> GlobalWindow>() { @Override public void apply(GlobalWindow window, Iterable values, Collector out) throws Exception { for (Long value : values) { if (value % 3 == 0) { out.collect(value); } } } }).print(); env.execute("yoyoyo"); Why my output is like this: 4> 9 1> 0 1> 12 3> 6 3> 18 2> 3 2> 15 ? I.e. where id s value of 24 for example? I expect to see it. What am I doing wrong? >>>
Re: Values are missing, probably due parallelism?
Hi, which version of Flink are you using? Maybe there is a bug. I've tested it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees of parallelism if I change the source to emit 30 elements: LongStream.range(0, 30).forEach(ctx::collect); (The second argument of LongStream.range(start, end) is exclusive) Cheers, Aljoscha On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin wrote: > Actually this is not true - the source emits 30 values since it is started > with 0. If I change 29 to 33 result will be the same. > I can get all values if I play with parallelism. I.e putting parallel 1 > before print. > Or if I change 29 to 39 ( I have 4 cors) > I can guess that there is smth wrong with threads. BTW in this case how > threads are created and how data flows between? > On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" wrote: > >> Hi, >> this is related to your other question about count windows. The source >> emits 29 values so we only have two count-windows with 10 elements each. >> The last window is never triggered. >> >> Cheers, >> Aljoscha >> >> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin wrote: >> >>> I think it has smth to do with parallelism and I probably do not have >>> clear understanding how parallelism works in flink but in this example: >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> DataStreamSource source = env.addSource(new >>> SourceFunction() { >>> >>> @Override >>> public void run(SourceContext ctx) throws Exception { >>> LongStream.range(0, 29).forEach(ctx::collect); >>> } >>> >>> @Override >>> public void cancel() { >>> >>> } >>> }); >>> >>> source.countWindowAll(10).apply(new AllWindowFunction>> GlobalWindow>() { >>> @Override >>> public void apply(GlobalWindow window, Iterable values, >>> Collector out) throws Exception { >>> for (Long value : values) { >>> if (value % 3 == 0) { >>> out.collect(value); >>> } >>> } >>> } >>> }).print(); >>> >>> env.execute("yoyoyo"); >>> >>> Why my output is like this: >>> >>> 4> 9 >>> 1> 0 >>> 1> 12 >>> 3> 6 >>> 3> 18 >>> 2> 3 >>> 2> 15 >>> >>> ? I.e. where id s value of 24 for example? I expect to see it. What am I >>> doing wrong? >>> >>
Re: Values are missing, probably due parallelism?
Actually this is not true - the source emits 30 values since it is started with 0. If I change 29 to 33 result will be the same. I can get all values if I play with parallelism. I.e putting parallel 1 before print. Or if I change 29 to 39 ( I have 4 cors) I can guess that there is smth wrong with threads. BTW in this case how threads are created and how data flows between? On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" wrote: > Hi, > this is related to your other question about count windows. The source > emits 29 values so we only have two count-windows with 10 elements each. > The last window is never triggered. > > Cheers, > Aljoscha > > On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin wrote: > >> I think it has smth to do with parallelism and I probably do not have >> clear understanding how parallelism works in flink but in this example: >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStreamSource source = env.addSource(new SourceFunction() >> { >> >> @Override >> public void run(SourceContext ctx) throws Exception { >> LongStream.range(0, 29).forEach(ctx::collect); >> } >> >> @Override >> public void cancel() { >> >> } >> }); >> >> source.countWindowAll(10).apply(new AllWindowFunction> GlobalWindow>() { >> @Override >> public void apply(GlobalWindow window, Iterable values, >> Collector out) throws Exception { >> for (Long value : values) { >> if (value % 3 == 0) { >> out.collect(value); >> } >> } >> } >> }).print(); >> >> env.execute("yoyoyo"); >> >> Why my output is like this: >> >> 4> 9 >> 1> 0 >> 1> 12 >> 3> 6 >> 3> 18 >> 2> 3 >> 2> 15 >> >> ? I.e. where id s value of 24 for example? I expect to see it. What am I >> doing wrong? >> >
Re: Values are missing, probably due parallelism?
Hi, this is related to your other question about count windows. The source emits 29 values so we only have two count-windows with 10 elements each. The last window is never triggered. Cheers, Aljoscha On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin wrote: > I think it has smth to do with parallelism and I probably do not have > clear understanding how parallelism works in flink but in this example: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStreamSource source = env.addSource(new SourceFunction() { > > @Override > public void run(SourceContext ctx) throws Exception { > LongStream.range(0, 29).forEach(ctx::collect); > } > > @Override > public void cancel() { > > } > }); > > source.countWindowAll(10).apply(new AllWindowFunction GlobalWindow>() { > @Override > public void apply(GlobalWindow window, Iterable values, > Collector out) throws Exception { > for (Long value : values) { > if (value % 3 == 0) { > out.collect(value); > } > } > } > }).print(); > > env.execute("yoyoyo"); > > Why my output is like this: > > 4> 9 > 1> 0 > 1> 12 > 3> 6 > 3> 18 > 2> 3 > 2> 15 > > ? I.e. where id s value of 24 for example? I expect to see it. What am I > doing wrong? >
Values are missing, probably due parallelism?
I think it has smth to do with parallelism and I probably do not have clear understanding how parallelism works in flink but in this example: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource source = env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { LongStream.range(0, 29).forEach(ctx::collect); } @Override public void cancel() { } }); source.countWindowAll(10).apply(new AllWindowFunction() { @Override public void apply(GlobalWindow window, Iterable values, Collector out) throws Exception { for (Long value : values) { if (value % 3 == 0) { out.collect(value); } } } }).print(); env.execute("yoyoyo"); Why my output is like this: 4> 9 1> 0 1> 12 3> 6 3> 18 2> 3 2> 15 ? I.e. where id s value of 24 for example? I expect to see it. What am I doing wrong?