+Eugene, Reuven who reviewed and implemented this code. They may have opinions.
Note that changing the default filename policy would be backwards-incompatible, so this would either need to go into 2.0.0 (and a new RC3) or it would not go in. On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <[email protected] > wrote: > great JB, thanks > > I do not mind working on this - let's see if anyone else has additional > input. > > cheers > > On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]> wrote: > > > Got it. > > > > Yes, agree, I think the PerWindowFilesPolicy could be the default and let > > the > > user provides its own policy if he wants to. > > > > Regards > > JB > > > > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote: > > > Hi JB, > > > > > > yes I saw that thread - I also copied your code but did not want to > > pollute > > > it with my proposal :) > > > > > > Well ok maybe default FilePerWindow policy for windowedWrites in TextIO > > > does not make sense - not sure TBH... > > > > > > But would it make sense to promote a version of PerWindowFiles from > > > > > https://github.com/jbonofre/beam-samples/blob/master/iot/ > src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java > > > so that it is easier to provide some kind of PerWindowFiles filename > > > policy.. > > > > > > > > > something like (where user does not have to write PerWindowFilesPolicy, > > it > > > comes with Beam) > > > > > > > > > > > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix")) > > > .withWindowedWrites() > > > .withNumShards(1)); > > > > > > not sure if this was already discussed... > > > > > > cheers > > > Borisa > > > > > > > > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <[email protected]> > > wrote: > > > > > >> Hi Borisa, > > >> > > >> You can take a look about the other thread ("Direct runner doesn't > seem > > to > > >> finalize checkpoint "quickly""). > > >> > > >> It's basically the same point ;) > > >> > > >> The default trigger (event-time) doesn't fire any data. I'm > > investigating > > >> the > > >> element timestamp and watermark. > > >> > > >> I'm also playing with that, for instance: > > >> > > >> > > >> > > https://github.com/jbonofre/beam-samples/blob/master/iot/ > src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java > > >> > > >> When you use WindowedWrite, you have to provide a filename policy. We > > could > > >> provide a default one, but not sure it will fit fine (as it depends a > > lot > > >> about > > >> the use cases). > > >> > > >> Regards > > >> JB > > >> > > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote: > > >>> Hi guys, > > >>> > > >>> just playing with reading data from PubSub and writing using TextIO. > > >>> > > >>> First thing is that it is very hard to get any output - a lot of temp > > >> files > > >>> written but not always would get final files created. > > >>> > > >>> So, I am playing with triggers etc... If I do following > > >>> > > >>> PCollection<String> streamData = p.apply( > > >>> PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME + > > >>> "/topics/myTopic")); > > >>> > > >>> > > >>> > > >> > > streamData.apply(Window.<String>into(FixedWindows.of( > Duration.standardSeconds(5))) > > >>> > > >>> > > >> > > .triggering(Repeatedly.forever(AfterProcessingTime. > pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3)))) > > >>> .withAllowedLateness(Duration.ZERO) > > >>> .discardingFiredPanes()) > > >>> .apply(TextIO.write().to("/tmp/abc").withWindowedWrites() > > >>> .withSuffix(".suff").withNumShards(10)); > > >>> > > >>> p.run(); > > >>> > > >>> I would expect to see some files in /tmp/ with final results.. > unless I > > >> add > > >>> good triggers I usually do not get any data.. only temp files in > > >>> /temp/.beam/ > > >>> > > >>> but sometimes when data should be written I get following exception > > >>> > > >>> Exception in thread "main" > > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: > > >>> java.lang.UnsupportedOperationException: There is no default policy > for > > >>> windowed file output. Please provide an explicit FilenamePolicy to > > >> generate > > >>> filenames. > > >>> at > > >>> > > >> > > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult. > waitUntilFinish(DirectRunner.java:322) > > >>> at > > >>> > > >> > > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult. > waitUntilFinish(DirectRunner.java:292) > > >>> at > > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200) > > >>> at > > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63) > > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) > > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) > > >>> at Test.main(Test.java:50) > > >>> > > >>> > > >>> Would it make sense to change TextIO so that it does not > > >>> use DefaultFilenamePolicy only - but in case there are windowedWrites > > and > > >>> no filename policy was specified by user it could actually use custom > > >>> FilePerWindow policy automatically. I believe today TextIO always > > expects > > >>> user to specify FilenamePolicy, right? > > >>> > > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I > > believe > > >>> today there are only implementations in tests and examples but > nothing > > >>> publicly visible, right? > > >>> > > >>> > > >>> > > >>> thanks > > >>> > > >> > > >> -- > > >> Jean-Baptiste Onofré > > >> [email protected] > > >> http://blog.nanthrax.net > > >> Talend - http://www.talend.com > > >> > > > > > > > -- > > Jean-Baptiste Onofré > > [email protected] > > http://blog.nanthrax.net > > Talend - http://www.talend.com > > >
