Great... created this

https://issues.apache.org/jira/browse/BEAM-2276



On Fri, 12 May 2017 at 09:38 Jean-Baptiste Onofré <[email protected]> wrote:

> +1
>
> Borisa, if you want, we can work together on this.
>
> Thanks !
> Regards
> JB
>
> On 05/12/2017 10:33 AM, Borisa Zivkovic wrote:
> > +1 for DefaultFilenamePolicy being able to understand basic windowing...
> > probably the most
> > user-friendly way that would cover most of needs... in case of special
> > needs  users can provide their own policy..
> >
> > another alternative would be to have new class called
> > DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...
> >
> > any of those would make it easier for Beam users..
> >
> > so, someone needs to decide how we want to do this and if you want I can
> > work on it...
> >
> > cheers
> >
> > On Fri, 12 May 2017 at 08:18 Reuven Lax <[email protected]>
> wrote:
> >
> >> I believe that for most windows there is a standard stringification.
> >> However I think we could allow the user to inject a window formatter for
> >> cases where there is no good default (e.g. where the window is a
> >> complicated user-defined type, and toString() isn't good enough.
> >>
> >> Alternatively, if we don't want allow formatters,, we could make
> >> DefaultFilenamePolicy work with default stringifications of well-know
> >> windows (fixed, sliding, sessions, etc.), and just use toString() for
> >> remaining window types. Users that have weird custom window types can
> >> always right their own FilenamePolicy.
> >>
> >> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
> >> [email protected]> wrote:
> >>
> >>> I like the idea of WWW and PPP, assuming there is a standard enough
> >>> stringification of windows and panes. However, we may want to elide
> >>> adjacent tokes if the window is global or the pane is the only
> >>> possible (or first?) one to avoid writing things like
> >>> -0000-of-0005---.
> >>>
> >>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <[email protected]>
> >>> wrote:
> >>>> Another idea - we can extend the existing pattern that
> >>>> DefaultFileNamePolicy understands to include windows.
> >>>>
> >>>> Today it replaces SSS with the shard, and NNN with the number of
> shards
> >>> (so
> >>>> many templates contain -SSS-of-NNN). We could also have it recognize
> >> WWW
> >>>> and PPP, for the window and the pane respectively.
> >>>>
> >>>> I believe this would be a backwards-compatible change. We do not need
> >> to
> >>>> change any existing interfaces, we would simply be allowing the
> default
> >>>> policy to work on windows.
> >>>>
> >>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <[email protected]>
> >>> wrote:
> >>>>
> >>>>> +Eugene, Reuven who reviewed and implemented this code. They may have
> >>>>> opinions.
> >>>>>
> >>>>> Note that changing the default filename policy would be
> >>>>> backwards-incompatible, so this would either need to go into 2.0.0
> >> (and
> >>> a
> >>>>> new RC3) or it would not go in.
> >>>>>
> >>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> >>>>> [email protected]> wrote:
> >>>>>
> >>>>>> great JB, thanks
> >>>>>>
> >>>>>> I do not mind working on this - let's see if anyone else has
> >> additional
> >>>>>> input.
> >>>>>>
> >>>>>> cheers
> >>>>>>
> >>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Got it.
> >>>>>>>
> >>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default
> >> and
> >>>>>> let
> >>>>>>> the
> >>>>>>> user provides its own policy if he wants to.
> >>>>>>>
> >>>>>>> Regards
> >>>>>>> JB
> >>>>>>>
> >>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> >>>>>>>> Hi  JB,
> >>>>>>>>
> >>>>>>>> yes I saw that thread - I also copied your code but did not want
> >> to
> >>>>>>> pollute
> >>>>>>>> it with my proposal :)
> >>>>>>>>
> >>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in
> >>>>>> TextIO
> >>>>>>>> does not make sense - not sure TBH...
> >>>>>>>>
> >>>>>>>> But would it make sense to promote a version of PerWindowFiles
> >> from
> >>>>>>>>
> >>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>>>>>>> so that it is easier to provide some kind of PerWindowFiles
> >>> filename
> >>>>>>>> policy..
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> something like (where user does not have to write
> >>>>>> PerWindowFilesPolicy,
> >>>>>>> it
> >>>>>>>> comes with Beam)
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> >>>>>>>> .withWindowedWrites()
> >>>>>>>> .withNumShards(1));
> >>>>>>>>
> >>>>>>>> not sure if this was already discussed...
> >>>>>>>>
> >>>>>>>> cheers
> >>>>>>>> Borisa
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
> >> [email protected]
> >>>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Borisa,
> >>>>>>>>>
> >>>>>>>>> You can take a look about the other thread ("Direct runner
> >> doesn't
> >>>>>> seem
> >>>>>>> to
> >>>>>>>>> finalize checkpoint "quickly"").
> >>>>>>>>>
> >>>>>>>>> It's basically the same point ;)
> >>>>>>>>>
> >>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm
> >>>>>>> investigating
> >>>>>>>>> the
> >>>>>>>>> element timestamp and watermark.
> >>>>>>>>>
> >>>>>>>>> I'm also playing with that, for instance:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>>>>>>>>
> >>>>>>>>> When you use WindowedWrite, you have to provide a filename
> >>> policy. We
> >>>>>>> could
> >>>>>>>>> provide a default one, but not sure it will fit fine (as it
> >>> depends a
> >>>>>>> lot
> >>>>>>>>> about
> >>>>>>>>> the use cases).
> >>>>>>>>>
> >>>>>>>>> Regards
> >>>>>>>>> JB
> >>>>>>>>>
> >>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> >>>>>>>>>> Hi guys,
> >>>>>>>>>>
> >>>>>>>>>> just playing with reading data from PubSub and writing using
> >>> TextIO.
> >>>>>>>>>>
> >>>>>>>>>> First thing is that it is very hard to get any output - a lot
> >> of
> >>>>>> temp
> >>>>>>>>> files
> >>>>>>>>>> written but not always would get final files created.
> >>>>>>>>>>
> >>>>>>>>>> So, I am playing with triggers etc... If I do following
> >>>>>>>>>>
> >>>>>>>>>> PCollection<String> streamData = p.apply(
> >>>>>>>>>>         PubsubIO.readStrings().fromTopic("projects/"+
> >>> PROJECT_NAME
> >>>>>> +
> >>>>>>>>>> "/topics/myTopic"));
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio
> >>>>>> n.standardSeconds(5)))
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
> >>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> >>>>>>>>>>             .withAllowedLateness(Duration.ZERO)
> >>>>>>>>>>             .discardingFiredPanes())
> >>>>>>>>>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> >>>>>>>>>>     .withSuffix(".suff").withNumShards(10));
> >>>>>>>>>>
> >>>>>>>>>>     p.run();
> >>>>>>>>>>
> >>>>>>>>>> I would expect to see some files in /tmp/ with final results..
> >>>>>> unless I
> >>>>>>>>> add
> >>>>>>>>>> good triggers I usually do not get any data.. only temp files
> >> in
> >>>>>>>>>> /temp/.beam/
> >>>>>>>>>>
> >>>>>>>>>> but sometimes when data should be written I get following
> >>> exception
> >>>>>>>>>>
> >>>>>>>>>> Exception in thread "main"
> >>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >>>>>>>>>> java.lang.UnsupportedOperationException: There is no default
> >>>>>> policy for
> >>>>>>>>>> windowed file output. Please provide an explicit FilenamePolicy
> >>> to
> >>>>>>>>> generate
> >>>>>>>>>> filenames.
> >>>>>>>>>> at
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>>>>> sult.waitUntilFinish(DirectRunner.java:322)
> >>>>>>>>>> at
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>>>>> sult.waitUntilFinish(DirectRunner.java:292)
> >>>>>>>>>> at
> >>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
> >>> DirectRunner.java:200)
> >>>>>>>>>> at
> >>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
> >>> DirectRunner.java:63)
> >>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> >>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> >>>>>>>>>> at Test.main(Test.java:50)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Would it make sense to change TextIO so that it does not
> >>>>>>>>>> use DefaultFilenamePolicy only - but in case there are
> >>>>>> windowedWrites
> >>>>>>> and
> >>>>>>>>>> no filename policy was specified by user it could actually use
> >>>>>> custom
> >>>>>>>>>> FilePerWindow policy automatically. I believe today TextIO
> >> always
> >>>>>>> expects
> >>>>>>>>>> user to specify FilenamePolicy, right?
> >>>>>>>>>>
> >>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam
> >> - I
> >>>>>>> believe
> >>>>>>>>>> today there are only implementations in tests and examples but
> >>>>>> nothing
> >>>>>>>>>> publicly visible, right?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> thanks
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Jean-Baptiste Onofré
> >>>>>>>>> [email protected]
> >>>>>>>>> http://blog.nanthrax.net
> >>>>>>>>> Talend - http://www.talend.com
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Jean-Baptiste Onofré
> >>>>>>> [email protected]
> >>>>>>> http://blog.nanthrax.net
> >>>>>>> Talend - http://www.talend.com
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>
> >
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to