(we should probably throw an exception at construction time in the various FileBasedSinks if you use WindowedWrites and the default filename policy though, that's a no-brainer and it's backwards-compatible.)
On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <[email protected]> wrote: > +Eugene, Reuven who reviewed and implemented this code. They may have > opinions. > > Note that changing the default filename policy would be > backwards-incompatible, so this would either need to go into 2.0.0 (and a > new RC3) or it would not go in. > > On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic < > [email protected]> wrote: > >> great JB, thanks >> >> I do not mind working on this - let's see if anyone else has additional >> input. >> >> cheers >> >> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]> >> wrote: >> >> > Got it. >> > >> > Yes, agree, I think the PerWindowFilesPolicy could be the default and >> let >> > the >> > user provides its own policy if he wants to. >> > >> > Regards >> > JB >> > >> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote: >> > > Hi JB, >> > > >> > > yes I saw that thread - I also copied your code but did not want to >> > pollute >> > > it with my proposal :) >> > > >> > > Well ok maybe default FilePerWindow policy for windowedWrites in >> TextIO >> > > does not make sense - not sure TBH... >> > > >> > > But would it make sense to promote a version of PerWindowFiles from >> > > >> > https://github.com/jbonofre/beam-samples/blob/master/iot/src >> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java >> > > so that it is easier to provide some kind of PerWindowFiles filename >> > > policy.. >> > > >> > > >> > > something like (where user does not have to write >> PerWindowFilesPolicy, >> > it >> > > comes with Beam) >> > > >> > > >> > > >> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix")) >> > > .withWindowedWrites() >> > > .withNumShards(1)); >> > > >> > > not sure if this was already discussed... >> > > >> > > cheers >> > > Borisa >> > > >> > > >> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <[email protected]> >> > wrote: >> > > >> > >> Hi Borisa, >> > >> >> > >> You can take a look about the other thread ("Direct runner doesn't >> seem >> > to >> > >> finalize checkpoint "quickly""). >> > >> >> > >> It's basically the same point ;) >> > >> >> > >> The default trigger (event-time) doesn't fire any data. I'm >> > investigating >> > >> the >> > >> element timestamp and watermark. >> > >> >> > >> I'm also playing with that, for instance: >> > >> >> > >> >> > >> >> > https://github.com/jbonofre/beam-samples/blob/master/iot/src >> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java >> > >> >> > >> When you use WindowedWrite, you have to provide a filename policy. We >> > could >> > >> provide a default one, but not sure it will fit fine (as it depends a >> > lot >> > >> about >> > >> the use cases). >> > >> >> > >> Regards >> > >> JB >> > >> >> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote: >> > >>> Hi guys, >> > >>> >> > >>> just playing with reading data from PubSub and writing using TextIO. >> > >>> >> > >>> First thing is that it is very hard to get any output - a lot of >> temp >> > >> files >> > >>> written but not always would get final files created. >> > >>> >> > >>> So, I am playing with triggers etc... If I do following >> > >>> >> > >>> PCollection<String> streamData = p.apply( >> > >>> PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME >> + >> > >>> "/topics/myTopic")); >> > >>> >> > >>> >> > >>> >> > >> >> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio >> n.standardSeconds(5))) >> > >>> >> > >>> >> > >> >> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst >> ElementInPane().plusDelayOf(Duration.standardSeconds(3)))) >> > >>> .withAllowedLateness(Duration.ZERO) >> > >>> .discardingFiredPanes()) >> > >>> .apply(TextIO.write().to("/tmp/abc").withWindowedWrites() >> > >>> .withSuffix(".suff").withNumShards(10)); >> > >>> >> > >>> p.run(); >> > >>> >> > >>> I would expect to see some files in /tmp/ with final results.. >> unless I >> > >> add >> > >>> good triggers I usually do not get any data.. only temp files in >> > >>> /temp/.beam/ >> > >>> >> > >>> but sometimes when data should be written I get following exception >> > >>> >> > >>> Exception in thread "main" >> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >> > >>> java.lang.UnsupportedOperationException: There is no default >> policy for >> > >>> windowed file output. Please provide an explicit FilenamePolicy to >> > >> generate >> > >>> filenames. >> > >>> at >> > >>> >> > >> >> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe >> sult.waitUntilFinish(DirectRunner.java:322) >> > >>> at >> > >>> >> > >> >> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe >> sult.waitUntilFinish(DirectRunner.java:292) >> > >>> at >> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200) >> > >>> at >> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63) >> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) >> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) >> > >>> at Test.main(Test.java:50) >> > >>> >> > >>> >> > >>> Would it make sense to change TextIO so that it does not >> > >>> use DefaultFilenamePolicy only - but in case there are >> windowedWrites >> > and >> > >>> no filename policy was specified by user it could actually use >> custom >> > >>> FilePerWindow policy automatically. I believe today TextIO always >> > expects >> > >>> user to specify FilenamePolicy, right? >> > >>> >> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I >> > believe >> > >>> today there are only implementations in tests and examples but >> nothing >> > >>> publicly visible, right? >> > >>> >> > >>> >> > >>> >> > >>> thanks >> > >>> >> > >> >> > >> -- >> > >> Jean-Baptiste Onofré >> > >> [email protected] >> > >> http://blog.nanthrax.net >> > >> Talend - http://www.talend.com >> > >> >> > > >> > >> > -- >> > Jean-Baptiste Onofré >> > [email protected] >> > http://blog.nanthrax.net >> > Talend - http://www.talend.com >> > >> > >
