Teodor, I've concluded this is a bug, and have reported it: https://issues.apache.org/jira/browse/FLINK-19109
Best regards, David On Sun, Aug 30, 2020 at 3:01 PM Teodor Spæren <teodor_spae...@riseup.net> wrote: > Hey again David! > > I tried your proposed change of setting the paralilism higher. This > worked, but why does this fix the behavior? I don't understand why this > would fix it. The only thing that happens to the query plan is that a > "remapping" node is added. > > Thanks for the fix, and for any additional answer :) > > Best regards, > Teodor > > On Sun, Aug 30, 2020 at 12:29:31PM +0200, Teodor Spæren wrote: > >Hey David! > > > >I tried what you said, but it did not solve the problem. The job still > >has to wait until the very end before outputting anything. > > > >I mentioned in my original email that I had set the parallelism to 1 > >job wide, but when I reran the task, I added your line. Are there any > >circumstances where despite having the global level set to 1, you > >still need to set the level on individual operators? > > > >PS: I sent this to you directly I'm sorry about that > > > >Best regards, > >Teodor > > > >On Sat, Aug 29, 2020 at 08:37:48PM +0200, David Anderson wrote: > >>Teodor, > >> > >>This is happening because of the way that readTextFile works when it is > >>executing in parallel, which is to divide the input file into a bunch of > >>splits, which are consumed in parallel. This is making it so that the > >>watermark isn't able to move forward until much or perhaps all of the > file > >>has been read. If you change the parallelism of the source to 1, like > this > >> > >> final DataStream<String> linesIn = > >>env.readTextFile(fileNameInput).setParallelism(1); > >> > >>then you should see the job make steady forward progress with windows > >>closing on a regular basis. > >> > >>Regards, > >>David > >> > >>On Sat, Aug 29, 2020 at 4:59 PM Teodor Spæren <teodor_spae...@riseup.net > > > >>wrote: > >> > >>>Hey! > >>> > >>>Second time posting to a mailing lists, lets hope I'm doing this > >>>correctly :) > >>> > >>>My usecase is to take data from the mediawiki dumps and stream it into > >>>Flink via the `readTextFile` method. The dumps are TSV files with an > >>>event per line, each event have a timestamp and a type. I want to use > >>>event time processing and simply print out how many of each event type > >>>there is per hour. The data can be out of order, so I have 1 hour > >>>tolerance. > >>> > >>>What I expect to happen here is that as it goes through a month of data, > >>>it will print out the hours as the watermark passes 1 hour. So I'll get > >>>output continuously until the end. > >>> > >>>What really happens is that the program outputs nothing until it is done > >>>and then it outputs everything. The timestamp is also stuck at > >>>9223372036854776000 in the web management. If I switch to using > >>>CountWindows instead of timewindows, it outputs continuously like I > >>>would expect it too, so it seems to be watermark related. > >>> > >>>I'm running Flink version 1.11.1 on JVM version: > >>> > >>>OpenJDK 64-Bit Server VM - GraalVM Community - > 11/11.0.7+10-jvmci-20.1-b02 > >>> > >>>The parallel setting is 1 and it's running on my laptop. > >>> > >>> > >>>I don't know how much code I'm allowed to attach here, so I've created a > >>>github repo with the complete self standing example [1]. To get the data > >>>used, run the following commands: > >>> > >>>$ wget > >>> > https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2 > >>>$ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat | pv -cN bzcat > >>>| sort -k4 > 2020-07.enwiki.2016-04.sorted.tsv > >>> > >>>If you don't have pv installed, just remove that part, I just like to > >>>have an overview. > >>> > >>> > >>>The main code part is this: > >>> > >>>package org.example.prow; > >>> > >>>import org.apache.flink.api.common.eventtime.WatermarkStrategy; > >>>import org.apache.flink.streaming.api.TimeCharacteristic; > >>>import org.apache.flink.streaming.api.datastream.DataStream; > >>>import org.apache.flink.streaming.api.datastream.KeyedStream; > >>>import > >>>org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > >>>import > >>>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >>>import > > >>>org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; > >>>import org.apache.flink.streaming.api.windowing.time.Time; > >>>import org.example.prow.wikimedia.Event; > >>> > >>>import java.time.Duration; > >>> > >>>public class App { > >>> public static void main(String[] args) throws Exception { > >>> final StreamExecutionEnvironment env = > >>>StreamExecutionEnvironment.getExecutionEnvironment(); > >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>> > >>> final String fileNameInput = > > >>>"file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv"; > >>> final DataStream<String> linesIn = > >>>env.readTextFile(fileNameInput); > >>> > >>> > >>> final SingleOutputStreamOperator<Event> jj = linesIn.map(value > -> > >>>new Event(value)); > >>> > >>> final WatermarkStrategy<Event> mew = > > >>>WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofHours(1)).withTimestampAssigner((element, > >>>recordTimestamp) -> element.eventTimestamp.toEpochSecond() * 1000); > >>> > >>> final DataStream<Event> props = > >>>jj.assignTimestampsAndWatermarks(mew); > >>> > >>> final KeyedStream<Event, String> praps = props.keyBy(e -> > >>>e.eventEntity.toString()); > >>> > >>> > >>> > praps.window(TumblingEventTimeWindows.of(Time.hours(1))).sum("something").print("JAJ!"); > >>> > >>> env.execute("FlinkWikipediaHistoryTopEditors"); > >>> } > >>>} > >>> > >>>If you see any erors here, please tell me, this is sort of driving me > >>>mad >_<. > >>> > >>>Best regards, > >>>Teodor Spæren > >>> > >>>[1] https://github.com/rHermes/flink-question-001 > >>> >