+Eugene, Reuven who reviewed and implemented this code. They may have
opinions.

Note that changing the default filename policy would be
backwards-incompatible, so this would either need to go into 2.0.0 (and a
new RC3) or it would not go in.

On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <[email protected]
> wrote:

> great JB, thanks
>
> I do not mind working on this - let's see if anyone else has additional
> input.
>
> cheers
>
> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]> wrote:
>
> > Got it.
> >
> > Yes, agree, I think the PerWindowFilesPolicy could be the default and let
> > the
> > user provides its own policy if he wants to.
> >
> > Regards
> > JB
> >
> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> > > Hi  JB,
> > >
> > > yes I saw that thread - I also copied your code but did not want to
> > pollute
> > > it with my proposal :)
> > >
> > > Well ok maybe default FilePerWindow policy for windowedWrites in TextIO
> > > does not make sense - not sure TBH...
> > >
> > > But would it make sense to promote a version of PerWindowFiles from
> > >
> > https://github.com/jbonofre/beam-samples/blob/master/iot/
> src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > > so that it is easier to provide some kind of PerWindowFiles filename
> > > policy..
> > >
> > >
> > > something like (where user does not have to write PerWindowFilesPolicy,
> > it
> > > comes with Beam)
> > >
> > >
> > >
> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> > > .withWindowedWrites()
> > > .withNumShards(1));
> > >
> > > not sure if this was already discussed...
> > >
> > > cheers
> > > Borisa
> > >
> > >
> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <[email protected]>
> > wrote:
> > >
> > >> Hi Borisa,
> > >>
> > >> You can take a look about the other thread ("Direct runner doesn't
> seem
> > to
> > >> finalize checkpoint "quickly"").
> > >>
> > >> It's basically the same point ;)
> > >>
> > >> The default trigger (event-time) doesn't fire any data. I'm
> > investigating
> > >> the
> > >> element timestamp and watermark.
> > >>
> > >> I'm also playing with that, for instance:
> > >>
> > >>
> > >>
> > https://github.com/jbonofre/beam-samples/blob/master/iot/
> src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > >>
> > >> When you use WindowedWrite, you have to provide a filename policy. We
> > could
> > >> provide a default one, but not sure it will fit fine (as it depends a
> > lot
> > >> about
> > >> the use cases).
> > >>
> > >> Regards
> > >> JB
> > >>
> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> > >>> Hi guys,
> > >>>
> > >>> just playing with reading data from PubSub and writing using TextIO.
> > >>>
> > >>> First thing is that it is very hard to get any output - a lot of temp
> > >> files
> > >>> written but not always would get final files created.
> > >>>
> > >>> So, I am playing with triggers etc... If I do following
> > >>>
> > >>> PCollection<String> streamData = p.apply(
> > >>>         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME +
> > >>> "/topics/myTopic"));
> > >>>
> > >>>
> > >>>
> > >>
> > streamData.apply(Window.<String>into(FixedWindows.of(
> Duration.standardSeconds(5)))
> > >>>
> > >>>
> > >>
> > .triggering(Repeatedly.forever(AfterProcessingTime.
> pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> > >>>             .withAllowedLateness(Duration.ZERO)
> > >>>             .discardingFiredPanes())
> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> > >>>     .withSuffix(".suff").withNumShards(10));
> > >>>
> > >>>     p.run();
> > >>>
> > >>> I would expect to see some files in /tmp/ with final results..
> unless I
> > >> add
> > >>> good triggers I usually do not get any data.. only temp files in
> > >>> /temp/.beam/
> > >>>
> > >>> but sometimes when data should be written I get following exception
> > >>>
> > >>> Exception in thread "main"
> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> > >>> java.lang.UnsupportedOperationException: There is no default policy
> for
> > >>> windowed file output. Please provide an explicit FilenamePolicy to
> > >> generate
> > >>> filenames.
> > >>> at
> > >>>
> > >>
> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> waitUntilFinish(DirectRunner.java:322)
> > >>> at
> > >>>
> > >>
> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> waitUntilFinish(DirectRunner.java:292)
> > >>> at
> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
> > >>> at
> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> > >>> at Test.main(Test.java:50)
> > >>>
> > >>>
> > >>> Would it make sense to change TextIO so that it does not
> > >>> use DefaultFilenamePolicy only - but in case there are windowedWrites
> > and
> > >>> no filename policy was specified by user it could actually use custom
> > >>> FilePerWindow policy automatically. I believe today TextIO always
> > expects
> > >>> user to specify FilenamePolicy, right?
> > >>>
> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I
> > believe
> > >>> today there are only implementations in tests and examples but
> nothing
> > >>> publicly visible, right?
> > >>>
> > >>>
> > >>>
> > >>> thanks
> > >>>
> > >>
> > >> --
> > >> Jean-Baptiste Onofré
> > >> [email protected]
> > >> http://blog.nanthrax.net
> > >> Talend - http://www.talend.com
> > >>
> > >
> >
> > --
> > Jean-Baptiste Onofré
> > [email protected]
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>

Reply via email to