Teodor,

I've concluded this is a bug, and have reported it:
https://issues.apache.org/jira/browse/FLINK-19109

Best regards,
David

On Sun, Aug 30, 2020 at 3:01 PM Teodor Spæren <teodor_spae...@riseup.net>
wrote:

> Hey again David!
>
> I tried your proposed change of setting the paralilism higher. This
> worked, but why does this fix the behavior? I don't understand why this
> would fix it. The only thing that happens to the query plan is that a
> "remapping" node is added.
>
> Thanks for the fix, and for any additional answer :)
>
> Best regards,
> Teodor
>
> On Sun, Aug 30, 2020 at 12:29:31PM +0200, Teodor Spæren wrote:
> >Hey David!
> >
> >I tried what you said, but it did not solve the problem. The job still
> >has to wait until the very end before outputting anything.
> >
> >I mentioned in my original email that I had set the parallelism to 1
> >job wide, but when I reran the task, I added your line. Are there any
> >circumstances where despite having the global level set to 1, you
> >still need to set the level on individual operators?
> >
> >PS: I sent this to you directly I'm sorry about that
> >
> >Best regards,
> >Teodor
> >
> >On Sat, Aug 29, 2020 at 08:37:48PM +0200, David Anderson wrote:
> >>Teodor,
> >>
> >>This is happening because of the way that readTextFile works when it is
> >>executing in parallel, which is to divide the input file into a bunch of
> >>splits, which are consumed in parallel. This is making it so that the
> >>watermark isn't able to move forward until much or perhaps all of the
> file
> >>has been read. If you change the parallelism of the source to 1, like
> this
> >>
> >>       final DataStream<String> linesIn =
> >>env.readTextFile(fileNameInput).setParallelism(1);
> >>
> >>then you should see the job make steady forward progress with windows
> >>closing on a regular basis.
> >>
> >>Regards,
> >>David
> >>
> >>On Sat, Aug 29, 2020 at 4:59 PM Teodor Spæren <teodor_spae...@riseup.net
> >
> >>wrote:
> >>
> >>>Hey!
> >>>
> >>>Second time posting to a mailing lists, lets hope I'm doing this
> >>>correctly :)
> >>>
> >>>My usecase is to take data from the mediawiki dumps and stream it into
> >>>Flink via the `readTextFile` method. The dumps are TSV files with an
> >>>event per line, each event have a timestamp and a type. I want to use
> >>>event time processing and simply print out how many of each event type
> >>>there is per hour. The data can be out of order, so I have 1 hour
> >>>tolerance.
> >>>
> >>>What I expect to happen here is that as it goes through a month of data,
> >>>it will print out the hours as the watermark passes 1 hour. So I'll get
> >>>output continuously until the end.
> >>>
> >>>What really happens is that the program outputs nothing until it is done
> >>>and then it outputs everything. The timestamp is also stuck at
> >>>9223372036854776000 in the web management. If I switch to using
> >>>CountWindows instead of timewindows, it outputs continuously like I
> >>>would expect it too, so it seems to be watermark related.
> >>>
> >>>I'm running Flink version 1.11.1 on JVM version:
> >>>
> >>>OpenJDK 64-Bit Server VM - GraalVM Community -
> 11/11.0.7+10-jvmci-20.1-b02
> >>>
> >>>The parallel setting is 1 and it's running on my laptop.
> >>>
> >>>
> >>>I don't know how much code I'm allowed to attach here, so I've created a
> >>>github repo with the complete self standing example [1]. To get the data
> >>>used, run the following commands:
> >>>
> >>>$ wget
> >>>
> https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2
> >>>$ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat  | pv -cN bzcat
> >>>|  sort -k4 > 2020-07.enwiki.2016-04.sorted.tsv
> >>>
> >>>If you don't have pv installed, just remove that part, I just like to
> >>>have an overview.
> >>>
> >>>
> >>>The main code part is this:
> >>>
> >>>package org.example.prow;
> >>>
> >>>import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> >>>import org.apache.flink.streaming.api.TimeCharacteristic;
> >>>import org.apache.flink.streaming.api.datastream.DataStream;
> >>>import org.apache.flink.streaming.api.datastream.KeyedStream;
> >>>import
> >>>org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> >>>import
> >>>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >>>import
>
> >>>org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> >>>import org.apache.flink.streaming.api.windowing.time.Time;
> >>>import org.example.prow.wikimedia.Event;
> >>>
> >>>import java.time.Duration;
> >>>
> >>>public class App {
> >>>     public static void main(String[] args) throws Exception {
> >>>         final StreamExecutionEnvironment env =
> >>>StreamExecutionEnvironment.getExecutionEnvironment();
> >>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>>
> >>>         final String fileNameInput =
>
> >>>"file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv";
> >>>         final DataStream<String> linesIn =
> >>>env.readTextFile(fileNameInput);
> >>>
> >>>
> >>>         final SingleOutputStreamOperator<Event> jj = linesIn.map(value
> ->
> >>>new Event(value));
> >>>
> >>>         final WatermarkStrategy<Event> mew =
>
> >>>WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofHours(1)).withTimestampAssigner((element,
> >>>recordTimestamp) -> element.eventTimestamp.toEpochSecond() * 1000);
> >>>
> >>>         final DataStream<Event> props =
> >>>jj.assignTimestampsAndWatermarks(mew);
> >>>
> >>>         final KeyedStream<Event, String> praps = props.keyBy(e ->
> >>>e.eventEntity.toString());
> >>>
> >>>
> >>>
> praps.window(TumblingEventTimeWindows.of(Time.hours(1))).sum("something").print("JAJ!");
> >>>
> >>>         env.execute("FlinkWikipediaHistoryTopEditors");
> >>>     }
> >>>}
> >>>
> >>>If you see any erors here, please tell me, this is sort of driving me
> >>>mad >_<.
> >>>
> >>>Best regards,
> >>>Teodor Spæren
> >>>
> >>>[1] https://github.com/rHermes/flink-question-001
> >>>
>

Reply via email to