Quick start guide

2017-09-03 Thread Michael Fong
Hi,

I was following the quick start guide on official documents

,
and I came cross a line that caused a bit confusion.
$ tail -f log/flink-*-jobmanager-*.out

It is said the wordcount program would print output to that output file.
However, when I run the code locally (mater branch, on IDE). I found the
output in taskmanager-*.out instead.

==> log/flink--*taskmanager*-0-out <==
ddd : 1
ccc : 1
bbb : 2
abc : 2
aba : 1
aab : 1
ddd : 1
aaa : 1
abca : 1
abac : 6


Is it an intended behavior in logic, or a typo in the document? Thanks in
advance.

Regards,


Re: termination of stream#iterate on finite streams

2017-09-03 Thread Peter Ertl
Hi Xingcan!

if a _finite_ stream would, at the end, emit a special, trailing "End-Of-Stream 
Message" that floats downward the operator stream, wouldn't this enable us to 
deterministically end the iteration without needing a timeout?

Having an arbitrary timeout that must be longer than any iteration step takes 
seems really awkward.

What you think?

Best regards
Peter


> Am 02.09.2017 um 17:16 schrieb Xingcan Cui  >:
> 
> Hi Peter,
> 
> I just omitted the filter part. Sorry for that.
> 
> Actually, as the javadoc explained, by default a DataStream with iteration 
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether the 
> feedback stream has reached its end
> (though the data source is terminated, there may be unknowable subsequent 
> data) and that's why it needs a
> timeout value to make the judgement, just like many other function calls in 
> network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator 
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
> 
> Internally, this mechanism is implemented via a blocking queue (the related 
> code can be found here 
> ).
> 
> Hope everything is considered this time : )
> 
> Best,
> Xingcan
> 
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl  > wrote:
> 
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui > >:
>> 
>> In your codes, all the the long values will subtract 1 and be sent back to 
>> the iterate operator, endlessly.
> 
> 
> Is this true? shouldn't
>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
> meaningless 'y' chars just to do anything
>   })
>   iterationResult2.print()
> 
> produce the following _feedback_ streams?
> 
> initial input to #iterate(): [1 2 3 4]
> 
> iteration #1 : [1 2 3]
> iteration #2 : [1 2]
> iteration #3 : [1]
> iteration #4 : []  => empty feedback stream => cause termination? (which 
> actually only happens when setting a timeout value)
> 
> Best regards
> Peter



> Am 02.09.2017 um 17:16 schrieb Xingcan Cui :
> 
> Hi Peter,
> 
> I just omitted the filter part. Sorry for that.
> 
> Actually, as the javadoc explained, by default a DataStream with iteration 
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether the 
> feedback stream has reached its end
> (though the data source is terminated, there may be unknowable subsequent 
> data) and that's why it needs a
> timeout value to make the judgement, just like many other function calls in 
> network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator 
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
> 
> Internally, this mechanism is implemented via a blocking queue (the related 
> code can be found here 
> ).
> 
> Hope everything is considered this time : )
> 
> Best,
> Xingcan
> 
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl  > wrote:
> 
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui > >:
>> 
>> In your codes, all the the long values will subtract 1 and be sent back to 
>> the iterate operator, endlessly.
> 
> 
> Is this true? shouldn't
>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
> meaningless 'y' chars just to do anything
>   })
>   iterationResult2.print()
> 
> produce the following _feedback_ streams?
> 
> initial input to #iterate(): [1 2 3 4]
> 
> iteration #1 : [1 2 3]
> iteration #2 : [1 2]
> iteration #3 : [1]
> iteration #4 : []  => empty feedback stream => cause termination? (which 
> actually only happens when setting a timeout value)
> 
> Best regards
> Peter
> 
> 
>