Re: Flink not outputting windows before all data is seen

2020-09-01 Thread David Anderson
Teodor,

I've concluded this is a bug, and have reported it:
https://issues.apache.org/jira/browse/FLINK-19109

Best regards,
David

On Sun, Aug 30, 2020 at 3:01 PM Teodor Spæren 
wrote:

> Hey again David!
>
> I tried your proposed change of setting the paralilism higher. This
> worked, but why does this fix the behavior? I don't understand why this
> would fix it. The only thing that happens to the query plan is that a
> "remapping" node is added.
>
> Thanks for the fix, and for any additional answer :)
>
> Best regards,
> Teodor
>
> On Sun, Aug 30, 2020 at 12:29:31PM +0200, Teodor Spæren wrote:
> >Hey David!
> >
> >I tried what you said, but it did not solve the problem. The job still
> >has to wait until the very end before outputting anything.
> >
> >I mentioned in my original email that I had set the parallelism to 1
> >job wide, but when I reran the task, I added your line. Are there any
> >circumstances where despite having the global level set to 1, you
> >still need to set the level on individual operators?
> >
> >PS: I sent this to you directly I'm sorry about that
> >
> >Best regards,
> >Teodor
> >
> >On Sat, Aug 29, 2020 at 08:37:48PM +0200, David Anderson wrote:
> >>Teodor,
> >>
> >>This is happening because of the way that readTextFile works when it is
> >>executing in parallel, which is to divide the input file into a bunch of
> >>splits, which are consumed in parallel. This is making it so that the
> >>watermark isn't able to move forward until much or perhaps all of the
> file
> >>has been read. If you change the parallelism of the source to 1, like
> this
> >>
> >>   final DataStream linesIn =
> >>env.readTextFile(fileNameInput).setParallelism(1);
> >>
> >>then you should see the job make steady forward progress with windows
> >>closing on a regular basis.
> >>
> >>Regards,
> >>David
> >>
> >>On Sat, Aug 29, 2020 at 4:59 PM Teodor Spæren  >
> >>wrote:
> >>
> >>>Hey!
> >>>
> >>>Second time posting to a mailing lists, lets hope I'm doing this
> >>>correctly :)
> >>>
> >>>My usecase is to take data from the mediawiki dumps and stream it into
> >>>Flink via the `readTextFile` method. The dumps are TSV files with an
> >>>event per line, each event have a timestamp and a type. I want to use
> >>>event time processing and simply print out how many of each event type
> >>>there is per hour. The data can be out of order, so I have 1 hour
> >>>tolerance.
> >>>
> >>>What I expect to happen here is that as it goes through a month of data,
> >>>it will print out the hours as the watermark passes 1 hour. So I'll get
> >>>output continuously until the end.
> >>>
> >>>What really happens is that the program outputs nothing until it is done
> >>>and then it outputs everything. The timestamp is also stuck at
> >>>9223372036854776000 in the web management. If I switch to using
> >>>CountWindows instead of timewindows, it outputs continuously like I
> >>>would expect it too, so it seems to be watermark related.
> >>>
> >>>I'm running Flink version 1.11.1 on JVM version:
> >>>
> >>>OpenJDK 64-Bit Server VM - GraalVM Community -
> 11/11.0.7+10-jvmci-20.1-b02
> >>>
> >>>The parallel setting is 1 and it's running on my laptop.
> >>>
> >>>
> >>>I don't know how much code I'm allowed to attach here, so I've created a
> >>>github repo with the complete self standing example [1]. To get the data
> >>>used, run the following commands:
> >>>
> >>>$ wget
> >>>
> https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2
> >>>$ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat  | pv -cN bzcat
> >>>|  sort -k4 > 2020-07.enwiki.2016-04.sorted.tsv
> >>>
> >>>If you don't have pv installed, just remove that part, I just like to
> >>>have an overview.
> >>>
> >>>
> >>>The main code part is this:
> >>>
> >>>package org.example.prow;
> >>>
> >>>import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> >>>import org.apache.flink.streaming.api.TimeCharacteristic;
> >>>import org.apache.flink.streaming.api.datastream.DataStream;
> >>>import org.apache.flink.streaming.api.datastream.KeyedStream;
> >>>import
> >>>org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> >>>import
> >>>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >>>import
>
> >>>org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> >>>import org.apache.flink.streaming.api.windowing.time.Time;
> >>>import org.example.prow.wikimedia.Event;
> >>>
> >>>import java.time.Duration;
> >>>
> >>>public class App {
> >>> public static void main(String[] args) throws Exception {
> >>> final StreamExecutionEnvironment env =
> >>>StreamExecutionEnvironment.getExecutionEnvironment();
> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>>
> >>> final String fileNameInput =
>
> >>>"file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv";
> >>> final DataStream linesIn 

Re: Flink not outputting windows before all data is seen

2020-08-30 Thread Teodor Spæren

Hey again David!

I tried your proposed change of setting the paralilism higher. This 
worked, but why does this fix the behavior? I don't understand why this 
would fix it. The only thing that happens to the query plan is that a 
"remapping" node is added.


Thanks for the fix, and for any additional answer :)

Best regards,
Teodor

On Sun, Aug 30, 2020 at 12:29:31PM +0200, Teodor Spæren wrote:

Hey David!

I tried what you said, but it did not solve the problem. The job still 
has to wait until the very end before outputting anything.


I mentioned in my original email that I had set the parallelism to 1 
job wide, but when I reran the task, I added your line. Are there any 
circumstances where despite having the global level set to 1, you 
still need to set the level on individual operators?


PS: I sent this to you directly I'm sorry about that

Best regards,
Teodor

On Sat, Aug 29, 2020 at 08:37:48PM +0200, David Anderson wrote:

Teodor,

This is happening because of the way that readTextFile works when it is
executing in parallel, which is to divide the input file into a bunch of
splits, which are consumed in parallel. This is making it so that the
watermark isn't able to move forward until much or perhaps all of the file
has been read. If you change the parallelism of the source to 1, like this

  final DataStream linesIn =
env.readTextFile(fileNameInput).setParallelism(1);

then you should see the job make steady forward progress with windows
closing on a regular basis.

Regards,
David

On Sat, Aug 29, 2020 at 4:59 PM Teodor Spæren 
wrote:


Hey!

Second time posting to a mailing lists, lets hope I'm doing this
correctly :)

My usecase is to take data from the mediawiki dumps and stream it into
Flink via the `readTextFile` method. The dumps are TSV files with an
event per line, each event have a timestamp and a type. I want to use
event time processing and simply print out how many of each event type
there is per hour. The data can be out of order, so I have 1 hour
tolerance.

What I expect to happen here is that as it goes through a month of data,
it will print out the hours as the watermark passes 1 hour. So I'll get
output continuously until the end.

What really happens is that the program outputs nothing until it is done
and then it outputs everything. The timestamp is also stuck at
9223372036854776000 in the web management. If I switch to using
CountWindows instead of timewindows, it outputs continuously like I
would expect it too, so it seems to be watermark related.

I'm running Flink version 1.11.1 on JVM version:

OpenJDK 64-Bit Server VM - GraalVM Community - 11/11.0.7+10-jvmci-20.1-b02

The parallel setting is 1 and it's running on my laptop.


I don't know how much code I'm allowed to attach here, so I've created a
github repo with the complete self standing example [1]. To get the data
used, run the following commands:

$ wget
https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2
$ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat  | pv -cN bzcat
|  sort -k4 > 2020-07.enwiki.2016-04.sorted.tsv

If you don't have pv installed, just remove that part, I just like to
have an overview.


The main code part is this:

package org.example.prow;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.example.prow.wikimedia.Event;

import java.time.Duration;

public class App {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final String fileNameInput =
"file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv";
final DataStream linesIn =
env.readTextFile(fileNameInput);


final SingleOutputStreamOperator jj = linesIn.map(value ->
new Event(value));

final WatermarkStrategy mew =
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(1)).withTimestampAssigner((element,
recordTimestamp) -> element.eventTimestamp.toEpochSecond() * 1000);

final DataStream props =
jj.assignTimestampsAndWatermarks(mew);

final KeyedStream praps = props.keyBy(e ->
e.eventEntity.toString());


praps.window(TumblingEventTimeWindows.of(Time.hours(1))).sum("something").print("JAJ!");

env.execute("FlinkWikipediaHistoryTopEditors");
}
}

If you see any erors here, please tell me, this is sort of 

Re: Flink not outputting windows before all data is seen

2020-08-30 Thread Teodor Spæren

Hey David!

I tried what you said, but it did not solve the problem. The job still 
has to wait until the very end before outputting anything.


I mentioned in my original email that I had set the parallelism to 1 job 
wide, but when I reran the task, I added your line. Are there any 
circumstances where despite having the global level set to 1, you still 
need to set the level on individual operators?


PS: I sent this to you directly I'm sorry about that

Best regards,
Teodor

On Sat, Aug 29, 2020 at 08:37:48PM +0200, David Anderson wrote:

Teodor,

This is happening because of the way that readTextFile works when it is
executing in parallel, which is to divide the input file into a bunch of
splits, which are consumed in parallel. This is making it so that the
watermark isn't able to move forward until much or perhaps all of the file
has been read. If you change the parallelism of the source to 1, like this

   final DataStream linesIn =
env.readTextFile(fileNameInput).setParallelism(1);

then you should see the job make steady forward progress with windows
closing on a regular basis.

Regards,
David

On Sat, Aug 29, 2020 at 4:59 PM Teodor Spæren 
wrote:


Hey!

Second time posting to a mailing lists, lets hope I'm doing this
correctly :)

My usecase is to take data from the mediawiki dumps and stream it into
Flink via the `readTextFile` method. The dumps are TSV files with an
event per line, each event have a timestamp and a type. I want to use
event time processing and simply print out how many of each event type
there is per hour. The data can be out of order, so I have 1 hour
tolerance.

What I expect to happen here is that as it goes through a month of data,
it will print out the hours as the watermark passes 1 hour. So I'll get
output continuously until the end.

What really happens is that the program outputs nothing until it is done
and then it outputs everything. The timestamp is also stuck at
9223372036854776000 in the web management. If I switch to using
CountWindows instead of timewindows, it outputs continuously like I
would expect it too, so it seems to be watermark related.

I'm running Flink version 1.11.1 on JVM version:

OpenJDK 64-Bit Server VM - GraalVM Community - 11/11.0.7+10-jvmci-20.1-b02

The parallel setting is 1 and it's running on my laptop.


I don't know how much code I'm allowed to attach here, so I've created a
github repo with the complete self standing example [1]. To get the data
used, run the following commands:

$ wget
https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2
$ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat  | pv -cN bzcat
|  sort -k4 > 2020-07.enwiki.2016-04.sorted.tsv

If you don't have pv installed, just remove that part, I just like to
have an overview.


The main code part is this:

package org.example.prow;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.example.prow.wikimedia.Event;

import java.time.Duration;

public class App {
 public static void main(String[] args) throws Exception {
 final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 final String fileNameInput =
"file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv";
 final DataStream linesIn =
env.readTextFile(fileNameInput);


 final SingleOutputStreamOperator jj = linesIn.map(value ->
new Event(value));

 final WatermarkStrategy mew =
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(1)).withTimestampAssigner((element,
recordTimestamp) -> element.eventTimestamp.toEpochSecond() * 1000);

 final DataStream props =
jj.assignTimestampsAndWatermarks(mew);

 final KeyedStream praps = props.keyBy(e ->
e.eventEntity.toString());


 
praps.window(TumblingEventTimeWindows.of(Time.hours(1))).sum("something").print("JAJ!");

 env.execute("FlinkWikipediaHistoryTopEditors");
 }
}

If you see any erors here, please tell me, this is sort of driving me
mad >_<.

Best regards,
Teodor Spæren

[1] https://github.com/rHermes/flink-question-001



Re: Flink not outputting windows before all data is seen

2020-08-29 Thread David Anderson
Teodor,

This is happening because of the way that readTextFile works when it is
executing in parallel, which is to divide the input file into a bunch of
splits, which are consumed in parallel. This is making it so that the
watermark isn't able to move forward until much or perhaps all of the file
has been read. If you change the parallelism of the source to 1, like this

final DataStream linesIn =
env.readTextFile(fileNameInput).setParallelism(1);

then you should see the job make steady forward progress with windows
closing on a regular basis.

Regards,
David

On Sat, Aug 29, 2020 at 4:59 PM Teodor Spæren 
wrote:

> Hey!
>
> Second time posting to a mailing lists, lets hope I'm doing this
> correctly :)
>
> My usecase is to take data from the mediawiki dumps and stream it into
> Flink via the `readTextFile` method. The dumps are TSV files with an
> event per line, each event have a timestamp and a type. I want to use
> event time processing and simply print out how many of each event type
> there is per hour. The data can be out of order, so I have 1 hour
> tolerance.
>
> What I expect to happen here is that as it goes through a month of data,
> it will print out the hours as the watermark passes 1 hour. So I'll get
> output continuously until the end.
>
> What really happens is that the program outputs nothing until it is done
> and then it outputs everything. The timestamp is also stuck at
> 9223372036854776000 in the web management. If I switch to using
> CountWindows instead of timewindows, it outputs continuously like I
> would expect it too, so it seems to be watermark related.
>
> I'm running Flink version 1.11.1 on JVM version:
>
> OpenJDK 64-Bit Server VM - GraalVM Community - 11/11.0.7+10-jvmci-20.1-b02
>
> The parallel setting is 1 and it's running on my laptop.
>
>
> I don't know how much code I'm allowed to attach here, so I've created a
> github repo with the complete self standing example [1]. To get the data
> used, run the following commands:
>
> $ wget
> https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2
> $ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat  | pv -cN bzcat
> |  sort -k4 > 2020-07.enwiki.2016-04.sorted.tsv
>
> If you don't have pv installed, just remove that part, I just like to
> have an overview.
>
>
> The main code part is this:
>
> package org.example.prow;
>
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.KeyedStream;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.example.prow.wikimedia.Event;
>
> import java.time.Duration;
>
> public class App {
>  public static void main(String[] args) throws Exception {
>  final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>  final String fileNameInput =
> "file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv";
>  final DataStream linesIn =
> env.readTextFile(fileNameInput);
>
>
>  final SingleOutputStreamOperator jj = linesIn.map(value ->
> new Event(value));
>
>  final WatermarkStrategy mew =
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(1)).withTimestampAssigner((element,
> recordTimestamp) -> element.eventTimestamp.toEpochSecond() * 1000);
>
>  final DataStream props =
> jj.assignTimestampsAndWatermarks(mew);
>
>  final KeyedStream praps = props.keyBy(e ->
> e.eventEntity.toString());
>
>
>  
> praps.window(TumblingEventTimeWindows.of(Time.hours(1))).sum("something").print("JAJ!");
>
>  env.execute("FlinkWikipediaHistoryTopEditors");
>  }
> }
>
> If you see any erors here, please tell me, this is sort of driving me
> mad >_<.
>
> Best regards,
> Teodor Spæren
>
> [1] https://github.com/rHermes/flink-question-001
>


Flink not outputting windows before all data is seen

2020-08-29 Thread Teodor Spæren

Hey!

Second time posting to a mailing lists, lets hope I'm doing this 
correctly :)


My usecase is to take data from the mediawiki dumps and stream it into 
Flink via the `readTextFile` method. The dumps are TSV files with an 
event per line, each event have a timestamp and a type. I want to use 
event time processing and simply print out how many of each event type 
there is per hour. The data can be out of order, so I have 1 hour 
tolerance.


What I expect to happen here is that as it goes through a month of data, 
it will print out the hours as the watermark passes 1 hour. So I'll get 
output continuously until the end.


What really happens is that the program outputs nothing until it is done 
and then it outputs everything. The timestamp is also stuck at 
9223372036854776000 in the web management. If I switch to using 
CountWindows instead of timewindows, it outputs continuously like I 
would expect it too, so it seems to be watermark related.


I'm running Flink version 1.11.1 on JVM version:

OpenJDK 64-Bit Server VM - GraalVM Community - 11/11.0.7+10-jvmci-20.1-b02

The parallel setting is 1 and it's running on my laptop.


I don't know how much code I'm allowed to attach here, so I've created a 
github repo with the complete self standing example [1]. To get the data 
used, run the following commands:


$ wget 
https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2
$ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat  | pv -cN bzcat |  sort 
-k4 > 2020-07.enwiki.2016-04.sorted.tsv

If you don't have pv installed, just remove that part, I just like to 
have an overview.



The main code part is this:

package org.example.prow;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.example.prow.wikimedia.Event;

import java.time.Duration;

public class App {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final String fileNameInput = 
"file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv";
final DataStream linesIn = env.readTextFile(fileNameInput);


final SingleOutputStreamOperator jj = linesIn.map(value -> new 
Event(value));

final WatermarkStrategy mew = 
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(1)).withTimestampAssigner((element,
 recordTimestamp) -> element.eventTimestamp.toEpochSecond() * 1000);

final DataStream props = jj.assignTimestampsAndWatermarks(mew);

final KeyedStream praps = props.keyBy(e -> 
e.eventEntity.toString());


praps.window(TumblingEventTimeWindows.of(Time.hours(1))).sum("something").print("JAJ!");

env.execute("FlinkWikipediaHistoryTopEditors");
}
}

If you see any erors here, please tell me, this is sort of driving me 
mad >_<.


Best regards,
Teodor Spæren

[1] https://github.com/rHermes/flink-question-001