[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16176197#comment-16176197 ] ASF GitHub Bot commented on FLINK-7567: --- Github user mlipkovich closed the pull request at: https://github.com/apache/flink/pull/4655 > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > Fix For: 1.4.0 > > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16176196#comment-16176196 ] ASF GitHub Bot commented on FLINK-7567: --- Github user mlipkovich commented on the issue: https://github.com/apache/flink/pull/4655 Thanks! Closing it > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > Fix For: 1.4.0 > > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16176151#comment-16176151 ] ASF GitHub Bot commented on FLINK-7567: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4655 Perfect! Thanks for working on this. 👍 I merged, could you please close this PR? > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173130#comment-16173130 ] ASF GitHub Bot commented on FLINK-7567: --- Github user mlipkovich commented on the issue: https://github.com/apache/flink/pull/4655 Hi @aljoscha It turned out that exclude filter works if we escape dollar signs. I commited these changes, but I believe it still should be fixed by japicmp What do you think about merging this PR? > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167804#comment-16167804 ] ASF GitHub Bot commented on FLINK-7567: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4655 Thanks! This doesn't help us with our situation, though. > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167800#comment-16167800 ] ASF GitHub Bot commented on FLINK-7567: --- Github user mlipkovich commented on the issue: https://github.com/apache/flink/pull/4655 Thansk, Aljoscha Probably I've forgotten to run `clean`. The method `iterate$default$3` is a method which automatically created by Scala for calculation of default value for parameter `keepPartitioning`. I tried different ways to exclude it but it didn't help. Anyway it should be somehow tracked by japicmp so I created the issue there https://github.com/siom79/japicmp/issues/176 > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167685#comment-16167685 ] ASF GitHub Bot commented on FLINK-7567: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4655 A local build of `mvn clean verify` fails for me because the japicmp plugin is complaining. The relevant section of the diff output from japicmp is ``` ***! MODIFIED CLASS: PUBLIC org.apache.flink.streaming.api.scala.DataStream (not serializable) ---! REMOVED METHOD: PUBLIC(-) boolean iterate$default$3() +++ NEW METHOD: PUBLIC(+) org.apache.flink.streaming.api.scala.DataStream setMaxParallelism(int) *** MODIFIED ANNOTATION: scala.reflect.ScalaSignature ``` i.e. it's complaining about `iterate$default$3()`. The problem seems to be that Scala will generate some obfuscated method name for the `iterate()` method and the `@PublicEvolving` annotation is not properly applied to that. I tried playing around with the japicmp config in the root pom file but to no avail. Maybe you can find something that works. What I added is this: ``` org.apache.flink.streaming.api.scala.DataStream#iterate$default$3() ``` but it seems that doesn't work. > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest ite
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166633#comment-16166633 ] ASF GitHub Bot commented on FLINK-7567: --- Github user mlipkovich commented on the issue: https://github.com/apache/flink/pull/4655 The local build works fine. If you mean `mvn verify` by compatibility plugin it also worked with no issues > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166365#comment-16166365 ] ASF GitHub Bot commented on FLINK-7567: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4655 You can try and run the build locally to have a look at the file generated by the compatibility plugin. Then we can figure out why it's complaining. > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16157885#comment-16157885 ] ASF GitHub Bot commented on FLINK-7567: --- Github user mlipkovich commented on the issue: https://github.com/apache/flink/pull/4655 As I understand the build has failed because of changed API method. The method which was changed has annotation PublicEvolving so there should be a way to change it. As was mentioned by @aljoscha https://issues.apache.org/jira/browse/FLINK-7567 there is no way to create a method with updated API and to deprecate the current one because of the default parameter > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156997#comment-16156997 ] Mikhail Lipkovich commented on FLINK-7567: -- Hi Peter, As a user I agree with Aljoscha that we should not do too much things silently. Modification of parallelism level can significantly change the performance. There can be situations where a user mistakenly had a wrong parallelism level of input stream. It's better to get an error message for a user and to make a decision by himself/herself. It can be either modification of input stream or feedback stream - actually we don't know which of these two we should modify > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156986#comment-16156986 ] ASF GitHub Bot commented on FLINK-7567: --- GitHub user mlipkovich opened a pull request: https://github.com/apache/flink/pull/4655 [FLINK-7567]: Removed keepPartitioning parameter from iterate method ## What is the purpose of the change Removed parameter keepPartitioning from DataStream#iterate method since it's ignored. Also slightly modified error message related to different parallelism levels of input and feedback streams ## Brief change log - Removed parameter keepPartitioning from DataStream#iterate ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) yes - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/mlipkovich/flink FLINK-7567 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4655.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4655 commit 2525aef6f65d297142472ae6532e3bddb08df0fd Author: Mikhail Lipkovich Date: 2017-09-07T14:05:22Z [FLINK-7567]: Removed keepPartitioning parameter from iterate method > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155873#comment-16155873 ] Peter Ertl commented on FLINK-7567: --- if the parallelism of the feedback stream MUST be equal to the parallism of the input stream why not make it the default? > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155329#comment-16155329 ] Aljoscha Krettek commented on FLINK-7567: - Yes, that message sounds good! I'm hesitant about the parameter as well, but the method is declared as {{@PublicEvolving}}. I'm also afraid in Scala it's not possible to add another method without that parameter and deprecate the original method because of optional parameters. > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155310#comment-16155310 ] Mikhail Lipkovich commented on FLINK-7567: -- I guess removing of parameter can be dangerous since it breaks the current API? But if you think it's fine I can assign this task to myself and remove it. Regarding to message I thought about adding the way to resolve the issue. Something like "Parallelism of original stream: 1; parallelism of feedback stream: 8. Parallelism can be modified using DataStream#setParallelism() method" > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155268#comment-16155268 ] Aljoscha Krettek commented on FLINK-7567: - Yes, we should in fact report that the parameter is unused or maybe even remove it. (This was left there by me, in fact, by accident. it seems) As for resolving the problem, I tend to think that we shouldn't do too much stuff automatically. I.e. in this case we would have to set the parallelism to 1 for the iteration feedback, which might not be what a user wants. The easiest is therefore to let the user know that there is a problem and let them figure out what they want to do. What message could we give the user? I think the existing message already quite clearly states what the problem is. > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16153629#comment-16153629 ] Mikhail Lipkovich commented on FLINK-7567: -- Since DataStream.Iterate() already contains flag 'keepPartitioning' it seems for me that we could align parallelism of input and feedback streams in this method when it's possible and to throw an exception with more clear message when it's not. I'm a newbie here, so maybe I'm missing something important. What do you think [~aljoscha] ? > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16153572#comment-16153572 ] Peter Ertl commented on FLINK-7567: --- Since this is kind of non-trivial to detect shouldn't flink somehow handle it (set default parallelism = 1) in this case? > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16153521#comment-16153521 ] Mikhail Lipkovich commented on FLINK-7567: -- Hi Peter, this error occurs due to this task https://issues.apache.org/jira/browse/FLINK-2398 It's not allowed now to have input and feedback streams of different parallel level. What you can do in your particular example is to change parallelism of the feedback stream: {code:java} it => { (it.filter(_ > 0).map(_ - 1).setParallelism(1), it.filter(_ > 0).map(_ => 'x') {code} Probably we should at least document that parameter keepPartitioning of DataStream.iterate is ignored > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)