Hi,

Flink program should have at least one data sink. When your program calls 
`print` method, the method adds a data sink into your program automatically and 
execute it immediately. If you want to run Flink program without calling 
`print` method, you should add a data sink into your program and execute it 
manually by calling `execute` method of `ExecutionEnvironment` object.

Note that only some methods about data sink such as `print`, `count` and 
`collect` execute the program immediately.

There are more detail descriptions about data sink [1] and lazy evaluation [2] 
in Flink documentation. The documentation will help you to understand the 
structure of Flink program.

Regards,
Chiwan Park

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html#data-sinks
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html#lazy-evaluation

> On Jul 20, 2015, at 2:59 AM, Madabhattula Rajesh Kumar <mrajaf...@gmail.com> 
> wrote:
> 
> Hi Scahin,
> 
> Thank you for the response. I have commented counts print line. After that I 
> got below exception
> 
> Exception in thread "main" java.lang.RuntimeException: No data sinks have 
> been created yet. A program needs at least one sink that consumes data. 
> Examples are writing the data set or printing it.
>     at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:914)
>     at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:893)
>     at 
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:50)
>     at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:590)
>     at WordCount$.main(WordCount.scala:13)
>     at WordCount.main(WordCount.scala)
> 
> Regards,
> Rajesh
> 
> On Sun, Jul 19, 2015 at 8:26 PM, Sachin Goel <sachingoel0...@gmail.com> wrote:
> Hi
> You do not need to call env.execute after doing a print call. Print itself 
> triggers the execution. The reason for the Exception is quite obvious. After 
> the print call, there is no sink for the program execution. So, execution 
> cannot proceed. 
> You can however explicitly define a sink and then call env.execute.
> 
> Cheers!
> Sachin
> 
> -- Sachin Goel
> Computer Science, IIT Delhi
> m. +91-9871457685
> 
> On Sun, Jul 19, 2015 at 8:06 PM, Madabhattula Rajesh Kumar 
> <mrajaf...@gmail.com> wrote:
> Hi,
> 
> I have written simple wordcount program in scala. When I execute the program, 
> I'm getting below exception.
> 
> Please let me know how to fix this issue. I'm using Flink 0.9.0 version
> 
> Below is the program :-
> 
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     // get input data
>     val text = env readTextFile("/Users/hadoop2/Data/word.txt")
>     val counts = text flatMap(l=>l split(" ")) map(word=>(word,1)) groupBy(0) 
> sum(1)
>     // emit result
>     counts print 
>     env.execute("TEST")  
> 
> Exception :-
> 
> Exception in thread "main" java.lang.RuntimeException: No new data sinks have 
> been defined since the last execution. The last execution refers to the 
> latest call to 'execute()', 'count()', 'collect()', or 'print()'.
>     at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:910)
>     at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:893)
>     at 
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:50)
>     at 
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:590)
>     at WordCount$.main(WordCount.scala:17)
>     at WordCount.main(WordCount.scala)
> 
> Regards,
> Rajesh
> 
> 





Reply via email to