Hey,
you are right. I'm also seeing this exception now. It was hidden in other
log output.

The solution to all this confusion is simple: DataStreamUtils.collect() Is
like an execute().

The stream graph is cleared on each execute(). That's why collect() and
then execute() lead to the "no operators defined" error.
However, if you have collect(), print(), execute(), then the print() is
filling the stream graph again, and you are executing two Flink jobs: the
collect job and the execute job.

I hope I got it right this time :)

Best,
Robert

On Fri, Feb 21, 2020 at 4:47 PM Niels Basjes <ni...@basjes.nl> wrote:

> I tried this in Flink 1.10.0 :
>
>     @Test
>     public void experimentalTest() throws Exception {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         DataStream<String> input = env.fromElements("One", "Two");
> //        DataStream<String> input = env.addSource(new 
> StringSourceFunction());
>         List<String> result = new ArrayList<>(5);
>         DataStreamUtils.collect(input).forEachRemaining(result::add);
>         env.execute("Flink Streaming Java API Skeleton");
>     }
>
>
> Results in
>
>
> java.lang.IllegalStateException: No operators defined in streaming topology. 
> Cannot execute.
>
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>       at 
> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> ...
>
>
>
> On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hey Niels,
>>
>> This minimal Flink job executes in Flink 1.10:
>>
>> public static void main(String[] args) throws Exception {
>>    final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>    DataStream<String> input = env.addSource(new StringSourceFunction());
>>    List<String> result = new ArrayList<>(5);
>>    DataStreamUtils.collect(input).forEachRemaining(result::add);
>>    env.execute("Flink Streaming Java API Skeleton");
>> }
>>
>> Maybe the TestUserAgentAnalysisMapperInline class is doing some magic
>> that breaks with the StreamGraphGenerator?
>>
>> Best,
>> Robert
>>
>> On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <ni...@basjes.nl> wrote:
>>
>>> Hi Gordon,
>>>
>>> Thanks. This works for me.
>>>
>>> I find it strange that when I do this it works (I made the differences
>>> bold)
>>>
>>> List<TestRecord> result = new ArrayList<>(5);
>>>
>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>>
>>> *resultDataStream.print();*
>>>
>>> environment.execute();
>>>
>>>
>>> how ever this does not work
>>>
>>> List<TestRecord> result = new ArrayList<>(5);
>>>
>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>>
>>> environment.execute();
>>>
>>>
>>> and this also does not work
>>>
>>> *resultDataStream.print();*
>>>
>>> List<TestRecord> result = new ArrayList<>(5);
>>>
>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>>
>>> environment.execute();
>>>
>>>
>>> In both these cases it fails with
>>>
>>>
>>> java.lang.IllegalStateException: *No operators defined in streaming
>>> topology. Cannot execute.*
>>>
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
>>> at
>>> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)
>>>
>>>
>>>
>>> Did I do something wrong?
>>> Is this a bug in the DataStreamUtils ?
>>>
>>> Niels Basjes
>>>
>>>
>>>
>>> On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <tzuli...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> To collect the elements of a DataStream (usually only meant for testing
>>>> purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Reply via email to