+1 for DefaultFilenamePolicy being able to understand basic windowing...
probably the most
user-friendly way that would cover most of needs... in case of special
needs  users can provide their own policy..

another alternative would be to have new class called
DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...

any of those would make it easier for Beam users..

so, someone needs to decide how we want to do this and if you want I can
work on it...

cheers

On Fri, 12 May 2017 at 08:18 Reuven Lax <[email protected]> wrote:

> I believe that for most windows there is a standard stringification.
> However I think we could allow the user to inject a window formatter for
> cases where there is no good default (e.g. where the window is a
> complicated user-defined type, and toString() isn't good enough.
>
> Alternatively, if we don't want allow formatters,, we could make
> DefaultFilenamePolicy work with default stringifications of well-know
> windows (fixed, sliding, sessions, etc.), and just use toString() for
> remaining window types. Users that have weird custom window types can
> always right their own FilenamePolicy.
>
> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
> [email protected]> wrote:
>
> > I like the idea of WWW and PPP, assuming there is a standard enough
> > stringification of windows and panes. However, we may want to elide
> > adjacent tokes if the window is global or the pane is the only
> > possible (or first?) one to avoid writing things like
> > -0000-of-0005---.
> >
> > On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <[email protected]>
> > wrote:
> > > Another idea - we can extend the existing pattern that
> > > DefaultFileNamePolicy understands to include windows.
> > >
> > > Today it replaces SSS with the shard, and NNN with the number of shards
> > (so
> > > many templates contain -SSS-of-NNN). We could also have it recognize
> WWW
> > > and PPP, for the window and the pane respectively.
> > >
> > > I believe this would be a backwards-compatible change. We do not need
> to
> > > change any existing interfaces, we would simply be allowing the default
> > > policy to work on windows.
> > >
> > > On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <[email protected]>
> > wrote:
> > >
> > >> +Eugene, Reuven who reviewed and implemented this code. They may have
> > >> opinions.
> > >>
> > >> Note that changing the default filename policy would be
> > >> backwards-incompatible, so this would either need to go into 2.0.0
> (and
> > a
> > >> new RC3) or it would not go in.
> > >>
> > >> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> > >> [email protected]> wrote:
> > >>
> > >>> great JB, thanks
> > >>>
> > >>> I do not mind working on this - let's see if anyone else has
> additional
> > >>> input.
> > >>>
> > >>> cheers
> > >>>
> > >>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]>
> > >>> wrote:
> > >>>
> > >>> > Got it.
> > >>> >
> > >>> > Yes, agree, I think the PerWindowFilesPolicy could be the default
> and
> > >>> let
> > >>> > the
> > >>> > user provides its own policy if he wants to.
> > >>> >
> > >>> > Regards
> > >>> > JB
> > >>> >
> > >>> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> > >>> > > Hi  JB,
> > >>> > >
> > >>> > > yes I saw that thread - I also copied your code but did not want
> to
> > >>> > pollute
> > >>> > > it with my proposal :)
> > >>> > >
> > >>> > > Well ok maybe default FilePerWindow policy for windowedWrites in
> > >>> TextIO
> > >>> > > does not make sense - not sure TBH...
> > >>> > >
> > >>> > > But would it make sense to promote a version of PerWindowFiles
> from
> > >>> > >
> > >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
> > >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > >>> > > so that it is easier to provide some kind of PerWindowFiles
> > filename
> > >>> > > policy..
> > >>> > >
> > >>> > >
> > >>> > > something like (where user does not have to write
> > >>> PerWindowFilesPolicy,
> > >>> > it
> > >>> > > comes with Beam)
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> > >>> > > .withWindowedWrites()
> > >>> > > .withNumShards(1));
> > >>> > >
> > >>> > > not sure if this was already discussed...
> > >>> > >
> > >>> > > cheers
> > >>> > > Borisa
> > >>> > >
> > >>> > >
> > >>> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
> [email protected]
> > >
> > >>> > wrote:
> > >>> > >
> > >>> > >> Hi Borisa,
> > >>> > >>
> > >>> > >> You can take a look about the other thread ("Direct runner
> doesn't
> > >>> seem
> > >>> > to
> > >>> > >> finalize checkpoint "quickly"").
> > >>> > >>
> > >>> > >> It's basically the same point ;)
> > >>> > >>
> > >>> > >> The default trigger (event-time) doesn't fire any data. I'm
> > >>> > investigating
> > >>> > >> the
> > >>> > >> element timestamp and watermark.
> > >>> > >>
> > >>> > >> I'm also playing with that, for instance:
> > >>> > >>
> > >>> > >>
> > >>> > >>
> > >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
> > >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > >>> > >>
> > >>> > >> When you use WindowedWrite, you have to provide a filename
> > policy. We
> > >>> > could
> > >>> > >> provide a default one, but not sure it will fit fine (as it
> > depends a
> > >>> > lot
> > >>> > >> about
> > >>> > >> the use cases).
> > >>> > >>
> > >>> > >> Regards
> > >>> > >> JB
> > >>> > >>
> > >>> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> > >>> > >>> Hi guys,
> > >>> > >>>
> > >>> > >>> just playing with reading data from PubSub and writing using
> > TextIO.
> > >>> > >>>
> > >>> > >>> First thing is that it is very hard to get any output - a lot
> of
> > >>> temp
> > >>> > >> files
> > >>> > >>> written but not always would get final files created.
> > >>> > >>>
> > >>> > >>> So, I am playing with triggers etc... If I do following
> > >>> > >>>
> > >>> > >>> PCollection<String> streamData = p.apply(
> > >>> > >>>         PubsubIO.readStrings().fromTopic("projects/"+
> > PROJECT_NAME
> > >>> +
> > >>> > >>> "/topics/myTopic"));
> > >>> > >>>
> > >>> > >>>
> > >>> > >>>
> > >>> > >>
> > >>> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio
> > >>> n.standardSeconds(5)))
> > >>> > >>>
> > >>> > >>>
> > >>> > >>
> > >>> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
> > >>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> > >>> > >>>             .withAllowedLateness(Duration.ZERO)
> > >>> > >>>             .discardingFiredPanes())
> > >>> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> > >>> > >>>     .withSuffix(".suff").withNumShards(10));
> > >>> > >>>
> > >>> > >>>     p.run();
> > >>> > >>>
> > >>> > >>> I would expect to see some files in /tmp/ with final results..
> > >>> unless I
> > >>> > >> add
> > >>> > >>> good triggers I usually do not get any data.. only temp files
> in
> > >>> > >>> /temp/.beam/
> > >>> > >>>
> > >>> > >>> but sometimes when data should be written I get following
> > exception
> > >>> > >>>
> > >>> > >>> Exception in thread "main"
> > >>> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> > >>> > >>> java.lang.UnsupportedOperationException: There is no default
> > >>> policy for
> > >>> > >>> windowed file output. Please provide an explicit FilenamePolicy
> > to
> > >>> > >> generate
> > >>> > >>> filenames.
> > >>> > >>> at
> > >>> > >>>
> > >>> > >>
> > >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> > >>> sult.waitUntilFinish(DirectRunner.java:322)
> > >>> > >>> at
> > >>> > >>>
> > >>> > >>
> > >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> > >>> sult.waitUntilFinish(DirectRunner.java:292)
> > >>> > >>> at
> > >>> > org.apache.beam.runners.direct.DirectRunner.run(
> > DirectRunner.java:200)
> > >>> > >>> at
> > >>> > org.apache.beam.runners.direct.DirectRunner.run(
> > DirectRunner.java:63)
> > >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> > >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> > >>> > >>> at Test.main(Test.java:50)
> > >>> > >>>
> > >>> > >>>
> > >>> > >>> Would it make sense to change TextIO so that it does not
> > >>> > >>> use DefaultFilenamePolicy only - but in case there are
> > >>> windowedWrites
> > >>> > and
> > >>> > >>> no filename policy was specified by user it could actually use
> > >>> custom
> > >>> > >>> FilePerWindow policy automatically. I believe today TextIO
> always
> > >>> > expects
> > >>> > >>> user to specify FilenamePolicy, right?
> > >>> > >>>
> > >>> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam
> - I
> > >>> > believe
> > >>> > >>> today there are only implementations in tests and examples but
> > >>> nothing
> > >>> > >>> publicly visible, right?
> > >>> > >>>
> > >>> > >>>
> > >>> > >>>
> > >>> > >>> thanks
> > >>> > >>>
> > >>> > >>
> > >>> > >> --
> > >>> > >> Jean-Baptiste Onofré
> > >>> > >> [email protected]
> > >>> > >> http://blog.nanthrax.net
> > >>> > >> Talend - http://www.talend.com
> > >>> > >>
> > >>> > >
> > >>> >
> > >>> > --
> > >>> > Jean-Baptiste Onofré
> > >>> > [email protected]
> > >>> > http://blog.nanthrax.net
> > >>> > Talend - http://www.talend.com
> > >>> >
> > >>>
> > >>
> > >>
> >
>

Reply via email to