[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176197#comment-16176197
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user mlipkovich closed the pull request at:

https://github.com/apache/flink/pull/4655


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
> Fix For: 1.4.0
>
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176196#comment-16176196
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user mlipkovich commented on the issue:

https://github.com/apache/flink/pull/4655
  
Thanks! Closing it


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
> Fix For: 1.4.0
>
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176151#comment-16176151
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4655
  
Perfect! Thanks for working on this.  

I merged, could you please close this PR?


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173130#comment-16173130
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user mlipkovich commented on the issue:

https://github.com/apache/flink/pull/4655
  
Hi @aljoscha 
It turned out that exclude filter works if we escape dollar signs. I 
commited these changes, but I believe it still should be fixed by japicmp
What do you think about merging this PR?


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167804#comment-16167804
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4655
  
Thanks! This doesn't help us with our situation, though.


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167800#comment-16167800
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user mlipkovich commented on the issue:

https://github.com/apache/flink/pull/4655
  
Thansk, Aljoscha 
Probably I've forgotten to run `clean`. 
The method `iterate$default$3` is a method which automatically created by 
Scala for calculation of default value for parameter `keepPartitioning`. I 
tried different ways to exclude it but it didn't help. Anyway it should be 
somehow tracked by japicmp so I created the issue there
https://github.com/siom79/japicmp/issues/176


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167685#comment-16167685
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4655
  
A local build of `mvn clean verify` fails for me because the japicmp plugin 
is complaining. The relevant section of the diff output from japicmp is
```
***! MODIFIED CLASS: PUBLIC org.apache.flink.streaming.api.scala.DataStream 
 (not serializable)
---! REMOVED METHOD: PUBLIC(-) boolean iterate$default$3()
+++  NEW METHOD: PUBLIC(+) 
org.apache.flink.streaming.api.scala.DataStream setMaxParallelism(int)
***  MODIFIED ANNOTATION: scala.reflect.ScalaSignature
```

i.e. it's complaining about `iterate$default$3()`. The problem seems to be 
that Scala will generate some obfuscated method name for the `iterate()` method 
and the `@PublicEvolving` annotation is not properly applied to that. I tried 
playing around with the japicmp config in the root pom file but to no avail. 
Maybe you can find something that works.

What I added is this:
``` 
org.apache.flink.streaming.api.scala.DataStream#iterate$default$3()
```

but it seems that doesn't work.


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream 

[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166633#comment-16166633
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user mlipkovich commented on the issue:

https://github.com/apache/flink/pull/4655
  
The local build works fine. If you mean `mvn verify` by compatibility 
plugin it also worked with no issues


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166365#comment-16166365
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4655
  
You can try and run the build locally to have a look at the file generated 
by the compatibility plugin. Then we can figure out why it's complaining.


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157885#comment-16157885
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user mlipkovich commented on the issue:

https://github.com/apache/flink/pull/4655
  
As I understand the build has failed because of changed API method. The 
method which was changed has annotation PublicEvolving so there should be a way 
to change it. As was mentioned by @aljoscha 
https://issues.apache.org/jira/browse/FLINK-7567 there is no way to create 
a method with updated API and to deprecate the current one because of the 
default parameter


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-07 Thread Mikhail Lipkovich (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156997#comment-16156997
 ] 

Mikhail Lipkovich commented on FLINK-7567:
--

Hi Peter,
As a user I agree with Aljoscha that we should not do too much things silently. 
Modification of parallelism level can significantly change the performance. 
There can be situations where a user mistakenly had a wrong parallelism level 
of input stream. It's better to get an error message for a user and to make a 
decision by himself/herself. It can be either modification of input stream or 
feedback stream - actually we don't know which of these two we should modify

> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156986#comment-16156986
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

GitHub user mlipkovich opened a pull request:

https://github.com/apache/flink/pull/4655

[FLINK-7567]: Removed keepPartitioning parameter from iterate method

## What is the purpose of the change

Removed parameter keepPartitioning from DataStream#iterate method since 
it's ignored. Also slightly modified error message related to different 
parallelism levels of input and feedback streams

## Brief change log

  - Removed parameter keepPartitioning from DataStream#iterate 

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)  no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) yes
  - The serializers: (yes / no / don't know) no
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) no

## Documentation

  - Does this pull request introduce a new feature? (yes / no) no
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented) not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mlipkovich/flink FLINK-7567

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4655.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4655


commit 2525aef6f65d297142472ae6532e3bddb08df0fd
Author: Mikhail Lipkovich 
Date:   2017-09-07T14:05:22Z

[FLINK-7567]: Removed keepPartitioning parameter from iterate method




> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at 

[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-06 Thread Peter Ertl (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155873#comment-16155873
 ] 

Peter Ertl commented on FLINK-7567:
---

if the parallelism of the feedback stream MUST be equal to the parallism of the 
input stream why not make it the default?

> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-06 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155329#comment-16155329
 ] 

Aljoscha Krettek commented on FLINK-7567:
-

Yes, that message sounds good! I'm hesitant about the parameter as well, but 
the method is declared as {{@PublicEvolving}}. I'm also afraid in Scala it's 
not possible to add another method without that parameter and deprecate the 
original method because of optional parameters.


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-06 Thread Mikhail Lipkovich (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155310#comment-16155310
 ] 

Mikhail Lipkovich commented on FLINK-7567:
--

I guess removing of parameter can be dangerous since it breaks the current API? 
But if you think it's fine I can assign this task to myself and remove it.
Regarding to message I thought about adding the way to resolve the issue. 
Something like
"Parallelism of original stream: 1; parallelism of feedback stream: 8. 
Parallelism can be modified using DataStream#setParallelism() method" 

> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-06 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155268#comment-16155268
 ] 

Aljoscha Krettek commented on FLINK-7567:
-

Yes, we should in fact report that the parameter is unused or maybe even remove 
it. (This was left there by me, in fact, by accident. it seems)

As for resolving the problem, I tend to think that we shouldn't do too much 
stuff automatically. I.e. in this case we would have to set the parallelism to 
1 for the iteration feedback, which might not be what a user wants. The easiest 
is therefore to let the user know that there is a problem and let them figure 
out what they want to do.

What message could we give the user? I think the existing message already quite 
clearly states what the problem is.

> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-05 Thread Mikhail Lipkovich (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16153629#comment-16153629
 ] 

Mikhail Lipkovich commented on FLINK-7567:
--

Since DataStream.Iterate() already contains flag 'keepPartitioning' it seems 
for me that we could align parallelism of input and feedback streams in this 
method when it's possible and to throw an exception with more clear message 
when it's not. 
I'm a newbie here, so maybe I'm missing something important. What do you think 
[~aljoscha] ?

> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-05 Thread Peter Ertl (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16153572#comment-16153572
 ] 

Peter Ertl commented on FLINK-7567:
---

Since this is kind of non-trivial to detect shouldn't flink somehow handle it 
(set default parallelism = 1) in this case?

> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-05 Thread Mikhail Lipkovich (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16153521#comment-16153521
 ] 

Mikhail Lipkovich commented on FLINK-7567:
--

Hi Peter,
this error occurs due to this task 
https://issues.apache.org/jira/browse/FLINK-2398
It's not allowed now to have input and feedback streams of different parallel 
level. What you can do in your particular example is to change parallelism of 
the feedback stream:

{code:java}
it => {
  (it.filter(_ > 0).map(_ - 1).setParallelism(1), it.filter(_ > 0).map(_ => 
'x')
{code}

Probably we should at least document that parameter keepPartitioning of 
DataStream.iterate is ignored


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)