Hi  JB,

yes I saw that thread - I also copied your code but did not want to pollute
it with my proposal :)

Well ok maybe default FilePerWindow policy for windowedWrites in TextIO
does not make sense - not sure TBH...

But would it make sense to promote a version of PerWindowFiles from
https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
so that it is easier to provide some kind of PerWindowFiles filename
policy..


something like (where user does not have to write PerWindowFilesPolicy, it
comes with Beam)



.withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
.withWindowedWrites()
.withNumShards(1));

not sure if this was already discussed...

cheers
Borisa


On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <[email protected]> wrote:

> Hi Borisa,
>
> You can take a look about the other thread ("Direct runner doesn't seem to
> finalize checkpoint "quickly"").
>
> It's basically the same point ;)
>
> The default trigger (event-time) doesn't fire any data. I'm investigating
> the
> element timestamp and watermark.
>
> I'm also playing with that, for instance:
>
>
> https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>
> When you use WindowedWrite, you have to provide a filename policy. We could
> provide a default one, but not sure it will fit fine (as it depends a lot
> about
> the use cases).
>
> Regards
> JB
>
> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> > Hi guys,
> >
> > just playing with reading data from PubSub and writing using TextIO.
> >
> > First thing is that it is very hard to get any output - a lot of temp
> files
> > written but not always would get final files created.
> >
> > So, I am playing with triggers etc... If I do following
> >
> > PCollection<String> streamData = p.apply(
> >         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME +
> > "/topics/myTopic"));
> >
> >
> >
> streamData.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
> >
> >
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> >             .withAllowedLateness(Duration.ZERO)
> >             .discardingFiredPanes())
> >     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> >     .withSuffix(".suff").withNumShards(10));
> >
> >     p.run();
> >
> > I would expect to see some files in /tmp/ with final results.. unless I
> add
> > good triggers I usually do not get any data.. only temp files in
> > /temp/.beam/
> >
> > but sometimes when data should be written I get following exception
> >
> > Exception in thread "main"
> > org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> > java.lang.UnsupportedOperationException: There is no default policy for
> > windowed file output. Please provide an explicit FilenamePolicy to
> generate
> > filenames.
> > at
> >
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
> > at
> >
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
> > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
> > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> > at Test.main(Test.java:50)
> >
> >
> > Would it make sense to change TextIO so that it does not
> > use DefaultFilenamePolicy only - but in case there are windowedWrites and
> > no filename policy was specified by user it could actually use custom
> > FilePerWindow policy automatically. I believe today TextIO always expects
> > user to specify FilenamePolicy, right?
> >
> > Or maybe to have FilePerWindow policy exposed as part of Beam - I believe
> > today there are only implementations in tests and examples but nothing
> > publicly visible, right?
> >
> >
> >
> > thanks
> >
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to