Hi JB, yes I saw that thread - I also copied your code but did not want to pollute it with my proposal :)
Well ok maybe default FilePerWindow policy for windowedWrites in TextIO does not make sense - not sure TBH... But would it make sense to promote a version of PerWindowFiles from https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java so that it is easier to provide some kind of PerWindowFiles filename policy.. something like (where user does not have to write PerWindowFilesPolicy, it comes with Beam) .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix")) .withWindowedWrites() .withNumShards(1)); not sure if this was already discussed... cheers Borisa On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <[email protected]> wrote: > Hi Borisa, > > You can take a look about the other thread ("Direct runner doesn't seem to > finalize checkpoint "quickly""). > > It's basically the same point ;) > > The default trigger (event-time) doesn't fire any data. I'm investigating > the > element timestamp and watermark. > > I'm also playing with that, for instance: > > > https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java > > When you use WindowedWrite, you have to provide a filename policy. We could > provide a default one, but not sure it will fit fine (as it depends a lot > about > the use cases). > > Regards > JB > > On 05/11/2017 05:01 PM, Borisa Zivkovic wrote: > > Hi guys, > > > > just playing with reading data from PubSub and writing using TextIO. > > > > First thing is that it is very hard to get any output - a lot of temp > files > > written but not always would get final files created. > > > > So, I am playing with triggers etc... If I do following > > > > PCollection<String> streamData = p.apply( > > PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME + > > "/topics/myTopic")); > > > > > > > streamData.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))) > > > > > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3)))) > > .withAllowedLateness(Duration.ZERO) > > .discardingFiredPanes()) > > .apply(TextIO.write().to("/tmp/abc").withWindowedWrites() > > .withSuffix(".suff").withNumShards(10)); > > > > p.run(); > > > > I would expect to see some files in /tmp/ with final results.. unless I > add > > good triggers I usually do not get any data.. only temp files in > > /temp/.beam/ > > > > but sometimes when data should be written I get following exception > > > > Exception in thread "main" > > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > > java.lang.UnsupportedOperationException: There is no default policy for > > windowed file output. Please provide an explicit FilenamePolicy to > generate > > filenames. > > at > > > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322) > > at > > > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292) > > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200) > > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) > > at Test.main(Test.java:50) > > > > > > Would it make sense to change TextIO so that it does not > > use DefaultFilenamePolicy only - but in case there are windowedWrites and > > no filename policy was specified by user it could actually use custom > > FilePerWindow policy automatically. I believe today TextIO always expects > > user to specify FilenamePolicy, right? > > > > Or maybe to have FilePerWindow policy exposed as part of Beam - I believe > > today there are only implementations in tests and examples but nothing > > publicly visible, right? > > > > > > > > thanks > > > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
