Created PR https://github.com/apache/beam/pull/3142

On Fri, 12 May 2017 at 16:31 Reuven Lax <[email protected]> wrote:

> Can we simply fail WWW if windowed writes is not set? Or at least warn?
>
> On May 12, 2017 6:14 PM, "Dan Halperin" <[email protected]>
> wrote:
>
> DefaultFilenamePolicy as currently written only accepts a single shard name
> template. if that template is windowed, it won't work for unwindowed writes
> (it will have -WWW-PPP or .WWW.PPP or something.WWW.something.PPPP?). And
> vice versa on unwindowed templated for windowed writes.
>
> There is likely a way to fix this more easily in the File IOs themselves.
>
> On Fri, May 12, 2017 at 4:33 AM, Jean-Baptiste Onofré <[email protected]>
> wrote:
>
> > Hi Reuven,
> >
> > yes, it's what I have in mind: just provide the default logic in the
> > existing method.
> >
> > Regards
> > JB
> >
> >
> > On 05/12/2017 12:46 PM, Reuven Lax wrote:
> >
> >> DefaultFilenamePolicy already contains a windowedFilename override
> (today
> >> it throws an exception), so I don't think there's any need for a new
> >> class.
> >> We can simply fill out the existing method.
> >>
> >> On May 12, 2017 11:34 AM, "Borisa Zivkovic" <[email protected]
> >
> >> wrote:
> >>
> >> +1 for DefaultFilenamePolicy being able to understand basic windowing...
> >> probably the most
> >> user-friendly way that would cover most of needs... in case of special
> >> needs  users can provide their own policy..
> >>
> >> another alternative would be to have new class called
> >> DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...
> >>
> >> any of those would make it easier for Beam users..
> >>
> >> so, someone needs to decide how we want to do this and if you want I can
> >> work on it...
> >>
> >> cheers
> >>
> >> On Fri, 12 May 2017 at 08:18 Reuven Lax <[email protected]>
> wrote:
> >>
> >> I believe that for most windows there is a standard stringification.
> >>> However I think we could allow the user to inject a window formatter
> for
> >>> cases where there is no good default (e.g. where the window is a
> >>> complicated user-defined type, and toString() isn't good enough.
> >>>
> >>> Alternatively, if we don't want allow formatters,, we could make
> >>> DefaultFilenamePolicy work with default stringifications of well-know
> >>> windows (fixed, sliding, sessions, etc.), and just use toString() for
> >>> remaining window types. Users that have weird custom window types can
> >>> always right their own FilenamePolicy.
> >>>
> >>> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
> >>> [email protected]> wrote:
> >>>
> >>> I like the idea of WWW and PPP, assuming there is a standard enough
> >>>> stringification of windows and panes. However, we may want to elide
> >>>> adjacent tokes if the window is global or the pane is the only
> >>>> possible (or first?) one to avoid writing things like
> >>>> -0000-of-0005---.
> >>>>
> >>>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <[email protected]
> >
> >>>> wrote:
> >>>>
> >>>>> Another idea - we can extend the existing pattern that
> >>>>> DefaultFileNamePolicy understands to include windows.
> >>>>>
> >>>>> Today it replaces SSS with the shard, and NNN with the number of
> >>>>>
> >>>> shards
> >>
> >>> (so
> >>>>
> >>>>> many templates contain -SSS-of-NNN). We could also have it recognize
> >>>>>
> >>>> WWW
> >>>
> >>>> and PPP, for the window and the pane respectively.
> >>>>>
> >>>>> I believe this would be a backwards-compatible change. We do not need
> >>>>>
> >>>> to
> >>>
> >>>> change any existing interfaces, we would simply be allowing the
> >>>>>
> >>>> default
> >>
> >>> policy to work on windows.
> >>>>>
> >>>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <[email protected]>
> >>>>>
> >>>> wrote:
> >>>>
> >>>>>
> >>>>> +Eugene, Reuven who reviewed and implemented this code. They may have
> >>>>>> opinions.
> >>>>>>
> >>>>>> Note that changing the default filename policy would be
> >>>>>> backwards-incompatible, so this would either need to go into 2.0.0
> >>>>>>
> >>>>> (and
> >>>
> >>>> a
> >>>>
> >>>>> new RC3) or it would not go in.
> >>>>>>
> >>>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> >>>>>> [email protected]> wrote:
> >>>>>>
> >>>>>> great JB, thanks
> >>>>>>>
> >>>>>>> I do not mind working on this - let's see if anyone else has
> >>>>>>>
> >>>>>> additional
> >>>
> >>>> input.
> >>>>>>>
> >>>>>>> cheers
> >>>>>>>
> >>>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]
> >
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>> Got it.
> >>>>>>>>
> >>>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default
> >>>>>>>>
> >>>>>>> and
> >>>
> >>>> let
> >>>>>>>
> >>>>>>>> the
> >>>>>>>> user provides its own policy if he wants to.
> >>>>>>>>
> >>>>>>>> Regards
> >>>>>>>> JB
> >>>>>>>>
> >>>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> >>>>>>>>
> >>>>>>>>> Hi  JB,
> >>>>>>>>>
> >>>>>>>>> yes I saw that thread - I also copied your code but did not want
> >>>>>>>>>
> >>>>>>>> to
> >>>
> >>>> pollute
> >>>>>>>>
> >>>>>>>>> it with my proposal :)
> >>>>>>>>>
> >>>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in
> >>>>>>>>>
> >>>>>>>> TextIO
> >>>>>>>
> >>>>>>>> does not make sense - not sure TBH...
> >>>>>>>>>
> >>>>>>>>> But would it make sense to promote a version of PerWindowFiles
> >>>>>>>>>
> >>>>>>>> from
> >>>
> >>>>
> >>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>>>>>>>
> >>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>>>>>>
> >>>>>>>> so that it is easier to provide some kind of PerWindowFiles
> >>>>>>>>>
> >>>>>>>> filename
> >>>>
> >>>>> policy..
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> something like (where user does not have to write
> >>>>>>>>>
> >>>>>>>> PerWindowFilesPolicy,
> >>>>>>>
> >>>>>>>> it
> >>>>>>>>
> >>>>>>>>> comes with Beam)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> >>>>>>>>> .withWindowedWrites()
> >>>>>>>>> .withNumShards(1));
> >>>>>>>>>
> >>>>>>>>> not sure if this was already discussed...
> >>>>>>>>>
> >>>>>>>>> cheers
> >>>>>>>>> Borisa
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
> >>>>>>>>>
> >>>>>>>> [email protected]
> >>>
> >>>>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Hi Borisa,
> >>>>>>>>>>
> >>>>>>>>>> You can take a look about the other thread ("Direct runner
> >>>>>>>>>>
> >>>>>>>>> doesn't
> >>>
> >>>> seem
> >>>>>>>
> >>>>>>>> to
> >>>>>>>>
> >>>>>>>>> finalize checkpoint "quickly"").
> >>>>>>>>>>
> >>>>>>>>>> It's basically the same point ;)
> >>>>>>>>>>
> >>>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm
> >>>>>>>>>>
> >>>>>>>>> investigating
> >>>>>>>>
> >>>>>>>>> the
> >>>>>>>>>> element timestamp and watermark.
> >>>>>>>>>>
> >>>>>>>>>> I'm also playing with that, for instance:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src
> >>>>>>>>
> >>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>>>>>>
> >>>>>>>>
> >>>>>>>>>> When you use WindowedWrite, you have to provide a filename
> >>>>>>>>>>
> >>>>>>>>> policy. We
> >>>>
> >>>>> could
> >>>>>>>>
> >>>>>>>>> provide a default one, but not sure it will fit fine (as it
> >>>>>>>>>>
> >>>>>>>>> depends a
> >>>>
> >>>>> lot
> >>>>>>>>
> >>>>>>>>> about
> >>>>>>>>>> the use cases).
> >>>>>>>>>>
> >>>>>>>>>> Regards
> >>>>>>>>>> JB
> >>>>>>>>>>
> >>>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi guys,
> >>>>>>>>>>>
> >>>>>>>>>>> just playing with reading data from PubSub and writing using
> >>>>>>>>>>>
> >>>>>>>>>> TextIO.
> >>>>
> >>>>>
> >>>>>>>>>>> First thing is that it is very hard to get any output - a lot
> >>>>>>>>>>>
> >>>>>>>>>> of
> >>>
> >>>> temp
> >>>>>>>
> >>>>>>>> files
> >>>>>>>>>>
> >>>>>>>>>>> written but not always would get final files created.
> >>>>>>>>>>>
> >>>>>>>>>>> So, I am playing with triggers etc... If I do following
> >>>>>>>>>>>
> >>>>>>>>>>> PCollection<String> streamData = p.apply(
> >>>>>>>>>>>         PubsubIO.readStrings().fromTopic("projects/"+
> >>>>>>>>>>>
> >>>>>>>>>> PROJECT_NAME
> >>>>
> >>>>> +
> >>>>>>>
> >>>>>>>> "/topics/myTopic"));
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio
> >>>>>>>>
> >>>>>>> n.standardSeconds(5)))
> >>>>>>>
> >>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
> >>>>>>>>
> >>>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> >>>>>>>
> >>>>>>>>             .withAllowedLateness(Duration.ZERO)
> >>>>>>>>>>>             .discardingFiredPanes())
> >>>>>>>>>>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> >>>>>>>>>>>     .withSuffix(".suff").withNumShards(10));
> >>>>>>>>>>>
> >>>>>>>>>>>     p.run();
> >>>>>>>>>>>
> >>>>>>>>>>> I would expect to see some files in /tmp/ with final results..
> >>>>>>>>>>>
> >>>>>>>>>> unless I
> >>>>>>>
> >>>>>>>> add
> >>>>>>>>>>
> >>>>>>>>>>> good triggers I usually do not get any data.. only temp files
> >>>>>>>>>>>
> >>>>>>>>>> in
> >>>
> >>>> /temp/.beam/
> >>>>>>>>>>>
> >>>>>>>>>>> but sometimes when data should be written I get following
> >>>>>>>>>>>
> >>>>>>>>>> exception
> >>>>
> >>>>>
> >>>>>>>>>>> Exception in thread "main"
> >>>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >>>>>>>>>>> java.lang.UnsupportedOperationException: There is no default
> >>>>>>>>>>>
> >>>>>>>>>> policy for
> >>>>>>>
> >>>>>>>> windowed file output. Please provide an explicit
> >>>>>>>>>>>
> >>>>>>>>>> FilenamePolicy
> >>
> >>> to
> >>>>
> >>>>> generate
> >>>>>>>>>>
> >>>>>>>>>>> filenames.
> >>>>>>>>>>> at
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>>>>>>>
> >>>>>>> sult.waitUntilFinish(DirectRunner.java:322)
> >>>>>>>
> >>>>>>>> at
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
> >>>>>>>>
> >>>>>>> sult.waitUntilFinish(DirectRunner.java:292)
> >>>>>>>
> >>>>>>>> at
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
> >>>>>>>>
> >>>>>>> DirectRunner.java:200)
> >>>>
> >>>>> at
> >>>>>>>>>>>
> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run(
> >>>>>>>>
> >>>>>>> DirectRunner.java:63)
> >>>>
> >>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> >>>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> >>>>>>>>>>> at Test.main(Test.java:50)
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Would it make sense to change TextIO so that it does not
> >>>>>>>>>>> use DefaultFilenamePolicy only - but in case there are
> >>>>>>>>>>>
> >>>>>>>>>> windowedWrites
> >>>>>>>
> >>>>>>>> and
> >>>>>>>>
> >>>>>>>>> no filename policy was specified by user it could actually use
> >>>>>>>>>>>
> >>>>>>>>>> custom
> >>>>>>>
> >>>>>>>> FilePerWindow policy automatically. I believe today TextIO
> >>>>>>>>>>>
> >>>>>>>>>> always
> >>>
> >>>> expects
> >>>>>>>>
> >>>>>>>>> user to specify FilenamePolicy, right?
> >>>>>>>>>>>
> >>>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam
> >>>>>>>>>>>
> >>>>>>>>>> - I
> >>>
> >>>> believe
> >>>>>>>>
> >>>>>>>>> today there are only implementations in tests and examples but
> >>>>>>>>>>>
> >>>>>>>>>> nothing
> >>>>>>>
> >>>>>>>> publicly visible, right?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> thanks
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> Jean-Baptiste Onofré
> >>>>>>>>>> [email protected]
> >>>>>>>>>> http://blog.nanthrax.net
> >>>>>>>>>> Talend - http://www.talend.com
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>> --
> >>>>>>>> Jean-Baptiste Onofré
> >>>>>>>> [email protected]
> >>>>>>>> http://blog.nanthrax.net
> >>>>>>>> Talend - http://www.talend.com
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> > --
> > Jean-Baptiste Onofré
> > [email protected]
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>

Reply via email to