Here is a pseudocode (sorry) of what I am doing right now: PCollection<KV<String, String>> writtenFiles = dataStream .withFixedWindow( duration = 1H, trigger = AfterWatermark.pastEndOfWindow() .withLateFirings(AfterFirst.of(
AfterPane.elementCountAtLeast(lateCount), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes))) .withEarlyFirings(AfterPane.elementCountAtLeast(maxBulkSize)), discardFiredPanes, lateness = 1 day) .saveWithDynamicDestinationsTo(fileIO) .getPerDestinationOutputFilenames() writtenFiles .groupBy(x -> x.getKey()) .withFixedWindow( duration = 1H, trigger = AfterWatermark.pastEndOfWindow() .withLateFirings(AfterPane.elementCountAtLeast(1)), discardFiredPanes, lateness = 1 day) .map(x -> println(x); x) but in second window over written files I do observe EARLY fires with group-by value iterator having always one file (propagated from write files result). ON_TIME fires are always empty. What am I missing here? How not to get early fires one by one but get all written files at the ON_TIME window fire? On Sun, Jul 22, 2018 at 4:27 PM Stephan Ewen <se...@apache.org> wrote: > For what it's worth, in Flink directly we found that this pattern is > generally not a well working one: windowing data in large windows in order > to perform large bulk writes. > > Instead, the sinks (to file systems) continuously write (possibly across > different destination files) files, ensure persistence at checkpoints, can > roll back the output in a file system specific ways to the previous > checkpoint. That way, there is no data buffering in state (memory, rocksdb, > etc) at all, only metadata tracking. > > For bulk encoders (like parquet), one needs an additional step, to > encode/compress when the specific destination file is done (if you think in > Hadoop terms, that would be in the "commit" step). > > > On Sun, Jul 22, 2018 at 2:10 PM, Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> I looked into Wait.on() but doc say it waits untill window is completely >> done, so it is not quite fir for my case, as my lateness can be a day or >> two and I would like to compact and publish hourly data sooner. >> >> What i am thinking of is write triggers under different location than >> target. I will have lot's of EARLY fires for main data and than some LATE >> fires. What I would like to do is observe all EARLY fires for window >> (ideally in ON_TIME event time) in one group and move those files to target >> dir by merging them. Observed LATE fires would be just moved immediately >> because that is not much and does not hurt to keep them fragmented now. >> >> The question is if it make sense and can be done with Beam? FileIO >> returns WriteFilesResult where I can call >> `getPerDestinationOutputFilenames()` which returns me a collection of KV >> with key being destination and value being a file which was written. I >> tried to window it again with different triggers (no early trigger) and >> groupBy key, but so far, no luck as it never yield a collection of files in >> which were emitted as EARLY in first window. >> >> >> On Fri, Jul 20, 2018 at 9:06 PM Raghu Angadi <rang...@google.com> wrote: >> >>> On Fri, Jul 20, 2018 at 2:58 AM Jozef Vilcek <jozo.vil...@gmail.com> >>> wrote: >>> >>>> Hm, that is interesting idea to make the write composite and merge >>>> files later. Do not know Beam well yet. >>>> I will look into it and learn about Wait.on() transform (wonder how it >>>> will work with late fires). Thanks! >>>> >>>> But keeps me thinking... >>>> Does it make sense to have support from SDK? >>>> Is my use case that uncommon? Not fit for Beam? How does others out >>>> there does similar thing? >>>> >>> >>> SDK does allow it. Looks like you are running into scaling and memory >>> limits with amount of state stored in large windows. This is something that >>> will improve. I am not familiar enough with Flink runner to comment on >>> specifics. I was mainly thinking of a work around. >>> >>> Raghu. >>> >>> >>>> >>>> On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <rang...@google.com> >>>> wrote: >>>> >>>>> One option (but requires more code): Write to smaller files with >>>>> frequent triggers to directory_X and once the window properly closes, copy >>>>> all the files to a single file in your own DoFn. This is certainly more >>>>> code on your part, but might be worth it. You can use Wait.on() transoform >>>>> to run your finalizer DoFn right after the window that writes smaller >>>>> files >>>>> closes. >>>>> >>>>> >>>>> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hey, >>>>>> >>>>>> I am looking for the advice. >>>>>> >>>>>> I am trying to do a stream processing with Beam on Flink runtime. >>>>>> Reading data from Kafka, doing some processing with it which is not >>>>>> important here and in the same time want to store consumed data to >>>>>> history >>>>>> storage for archive and reprocessing, which is HDFS. >>>>>> >>>>>> Now, the part of writing batches to HDFS is giving me hard time. >>>>>> Logically, I want to do: >>>>>> >>>>>> fileIO = FileIO.writeDynamic() >>>>>> .by(destinationFn) >>>>>> .via(AvroIO.sink(avroClass)) >>>>>> .to(path) >>>>>> .withNaming(namingFn) >>>>>> .withTempDirectory(tmp) >>>>>> .withNumShards(shards) >>>>>> >>>>>> data >>>>>> .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes) >>>>>> .saveTo(fileIO) >>>>>> >>>>>> >>>>>> This write generates in Flink execution graph 3 operators, which I do >>>>>> not full understand yet. >>>>>> >>>>>> Now, the problem is, that I am not able to run this at scale. >>>>>> >>>>>> If I want to write big enough files to not to have lots of files on >>>>>> HDFS, I keep running into the OOM. With Flink, I use rocksdb state >>>>>> backend >>>>>> and I was warned about this JIRA which is probably related to my OOM >>>>>> https://issues.apache.org/jira/browse/FLINK-8297 >>>>>> Therefore, I need to trigger more often and small batches which leads >>>>>> to too many files on HDFS. >>>>>> >>>>>> Question here is, if there is some path I do not see how to make this >>>>>> work ( write bulks of data to HDFS of my choosing without running to >>>>>> memory >>>>>> troubles ). Also, keeping whole window data which is designated for write >>>>>> to output to filesystem in state involves more IO. >>>>>> >>>>>> Thanks for any thoughts and guidelines, >>>>>> Jozef >>>>>> >>>>>> >