Created PR https://github.com/apache/beam/pull/3142
On Fri, 12 May 2017 at 16:31 Reuven Lax <[email protected]> wrote: > Can we simply fail WWW if windowed writes is not set? Or at least warn? > > On May 12, 2017 6:14 PM, "Dan Halperin" <[email protected]> > wrote: > > DefaultFilenamePolicy as currently written only accepts a single shard name > template. if that template is windowed, it won't work for unwindowed writes > (it will have -WWW-PPP or .WWW.PPP or something.WWW.something.PPPP?). And > vice versa on unwindowed templated for windowed writes. > > There is likely a way to fix this more easily in the File IOs themselves. > > On Fri, May 12, 2017 at 4:33 AM, Jean-Baptiste Onofré <[email protected]> > wrote: > > > Hi Reuven, > > > > yes, it's what I have in mind: just provide the default logic in the > > existing method. > > > > Regards > > JB > > > > > > On 05/12/2017 12:46 PM, Reuven Lax wrote: > > > >> DefaultFilenamePolicy already contains a windowedFilename override > (today > >> it throws an exception), so I don't think there's any need for a new > >> class. > >> We can simply fill out the existing method. > >> > >> On May 12, 2017 11:34 AM, "Borisa Zivkovic" <[email protected] > > > >> wrote: > >> > >> +1 for DefaultFilenamePolicy being able to understand basic windowing... > >> probably the most > >> user-friendly way that would cover most of needs... in case of special > >> needs users can provide their own policy.. > >> > >> another alternative would be to have new class called > >> DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ... > >> > >> any of those would make it easier for Beam users.. > >> > >> so, someone needs to decide how we want to do this and if you want I can > >> work on it... > >> > >> cheers > >> > >> On Fri, 12 May 2017 at 08:18 Reuven Lax <[email protected]> > wrote: > >> > >> I believe that for most windows there is a standard stringification. > >>> However I think we could allow the user to inject a window formatter > for > >>> cases where there is no good default (e.g. where the window is a > >>> complicated user-defined type, and toString() isn't good enough. > >>> > >>> Alternatively, if we don't want allow formatters,, we could make > >>> DefaultFilenamePolicy work with default stringifications of well-know > >>> windows (fixed, sliding, sessions, etc.), and just use toString() for > >>> remaining window types. Users that have weird custom window types can > >>> always right their own FilenamePolicy. > >>> > >>> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw < > >>> [email protected]> wrote: > >>> > >>> I like the idea of WWW and PPP, assuming there is a standard enough > >>>> stringification of windows and panes. However, we may want to elide > >>>> adjacent tokes if the window is global or the pane is the only > >>>> possible (or first?) one to avoid writing things like > >>>> -0000-of-0005---. > >>>> > >>>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <[email protected] > > > >>>> wrote: > >>>> > >>>>> Another idea - we can extend the existing pattern that > >>>>> DefaultFileNamePolicy understands to include windows. > >>>>> > >>>>> Today it replaces SSS with the shard, and NNN with the number of > >>>>> > >>>> shards > >> > >>> (so > >>>> > >>>>> many templates contain -SSS-of-NNN). We could also have it recognize > >>>>> > >>>> WWW > >>> > >>>> and PPP, for the window and the pane respectively. > >>>>> > >>>>> I believe this would be a backwards-compatible change. We do not need > >>>>> > >>>> to > >>> > >>>> change any existing interfaces, we would simply be allowing the > >>>>> > >>>> default > >> > >>> policy to work on windows. > >>>>> > >>>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <[email protected]> > >>>>> > >>>> wrote: > >>>> > >>>>> > >>>>> +Eugene, Reuven who reviewed and implemented this code. They may have > >>>>>> opinions. > >>>>>> > >>>>>> Note that changing the default filename policy would be > >>>>>> backwards-incompatible, so this would either need to go into 2.0.0 > >>>>>> > >>>>> (and > >>> > >>>> a > >>>> > >>>>> new RC3) or it would not go in. > >>>>>> > >>>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic < > >>>>>> [email protected]> wrote: > >>>>>> > >>>>>> great JB, thanks > >>>>>>> > >>>>>>> I do not mind working on this - let's see if anyone else has > >>>>>>> > >>>>>> additional > >>> > >>>> input. > >>>>>>> > >>>>>>> cheers > >>>>>>> > >>>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected] > > > >>>>>>> wrote: > >>>>>>> > >>>>>>> Got it. > >>>>>>>> > >>>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default > >>>>>>>> > >>>>>>> and > >>> > >>>> let > >>>>>>> > >>>>>>>> the > >>>>>>>> user provides its own policy if he wants to. > >>>>>>>> > >>>>>>>> Regards > >>>>>>>> JB > >>>>>>>> > >>>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote: > >>>>>>>> > >>>>>>>>> Hi JB, > >>>>>>>>> > >>>>>>>>> yes I saw that thread - I also copied your code but did not want > >>>>>>>>> > >>>>>>>> to > >>> > >>>> pollute > >>>>>>>> > >>>>>>>>> it with my proposal :) > >>>>>>>>> > >>>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in > >>>>>>>>> > >>>>>>>> TextIO > >>>>>>> > >>>>>>>> does not make sense - not sure TBH... > >>>>>>>>> > >>>>>>>>> But would it make sense to promote a version of PerWindowFiles > >>>>>>>>> > >>>>>>>> from > >>> > >>>> > >>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src > >>>>>>>> > >>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java > >>>>>>> > >>>>>>>> so that it is easier to provide some kind of PerWindowFiles > >>>>>>>>> > >>>>>>>> filename > >>>> > >>>>> policy.. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> something like (where user does not have to write > >>>>>>>>> > >>>>>>>> PerWindowFilesPolicy, > >>>>>>> > >>>>>>>> it > >>>>>>>> > >>>>>>>>> comes with Beam) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix")) > >>>>>>>>> .withWindowedWrites() > >>>>>>>>> .withNumShards(1)); > >>>>>>>>> > >>>>>>>>> not sure if this was already discussed... > >>>>>>>>> > >>>>>>>>> cheers > >>>>>>>>> Borisa > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré < > >>>>>>>>> > >>>>>>>> [email protected] > >>> > >>>> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> > >>>>>>>>> Hi Borisa, > >>>>>>>>>> > >>>>>>>>>> You can take a look about the other thread ("Direct runner > >>>>>>>>>> > >>>>>>>>> doesn't > >>> > >>>> seem > >>>>>>> > >>>>>>>> to > >>>>>>>> > >>>>>>>>> finalize checkpoint "quickly""). > >>>>>>>>>> > >>>>>>>>>> It's basically the same point ;) > >>>>>>>>>> > >>>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm > >>>>>>>>>> > >>>>>>>>> investigating > >>>>>>>> > >>>>>>>>> the > >>>>>>>>>> element timestamp and watermark. > >>>>>>>>>> > >>>>>>>>>> I'm also playing with that, for instance: > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src > >>>>>>>> > >>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java > >>>>>>> > >>>>>>>> > >>>>>>>>>> When you use WindowedWrite, you have to provide a filename > >>>>>>>>>> > >>>>>>>>> policy. We > >>>> > >>>>> could > >>>>>>>> > >>>>>>>>> provide a default one, but not sure it will fit fine (as it > >>>>>>>>>> > >>>>>>>>> depends a > >>>> > >>>>> lot > >>>>>>>> > >>>>>>>>> about > >>>>>>>>>> the use cases). > >>>>>>>>>> > >>>>>>>>>> Regards > >>>>>>>>>> JB > >>>>>>>>>> > >>>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi guys, > >>>>>>>>>>> > >>>>>>>>>>> just playing with reading data from PubSub and writing using > >>>>>>>>>>> > >>>>>>>>>> TextIO. > >>>> > >>>>> > >>>>>>>>>>> First thing is that it is very hard to get any output - a lot > >>>>>>>>>>> > >>>>>>>>>> of > >>> > >>>> temp > >>>>>>> > >>>>>>>> files > >>>>>>>>>> > >>>>>>>>>>> written but not always would get final files created. > >>>>>>>>>>> > >>>>>>>>>>> So, I am playing with triggers etc... If I do following > >>>>>>>>>>> > >>>>>>>>>>> PCollection<String> streamData = p.apply( > >>>>>>>>>>> PubsubIO.readStrings().fromTopic("projects/"+ > >>>>>>>>>>> > >>>>>>>>>> PROJECT_NAME > >>>> > >>>>> + > >>>>>>> > >>>>>>>> "/topics/myTopic")); > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio > >>>>>>>> > >>>>>>> n.standardSeconds(5))) > >>>>>>> > >>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst > >>>>>>>> > >>>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3)))) > >>>>>>> > >>>>>>>> .withAllowedLateness(Duration.ZERO) > >>>>>>>>>>> .discardingFiredPanes()) > >>>>>>>>>>> .apply(TextIO.write().to("/tmp/abc").withWindowedWrites() > >>>>>>>>>>> .withSuffix(".suff").withNumShards(10)); > >>>>>>>>>>> > >>>>>>>>>>> p.run(); > >>>>>>>>>>> > >>>>>>>>>>> I would expect to see some files in /tmp/ with final results.. > >>>>>>>>>>> > >>>>>>>>>> unless I > >>>>>>> > >>>>>>>> add > >>>>>>>>>> > >>>>>>>>>>> good triggers I usually do not get any data.. only temp files > >>>>>>>>>>> > >>>>>>>>>> in > >>> > >>>> /temp/.beam/ > >>>>>>>>>>> > >>>>>>>>>>> but sometimes when data should be written I get following > >>>>>>>>>>> > >>>>>>>>>> exception > >>>> > >>>>> > >>>>>>>>>>> Exception in thread "main" > >>>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: > >>>>>>>>>>> java.lang.UnsupportedOperationException: There is no default > >>>>>>>>>>> > >>>>>>>>>> policy for > >>>>>>> > >>>>>>>> windowed file output. Please provide an explicit > >>>>>>>>>>> > >>>>>>>>>> FilenamePolicy > >> > >>> to > >>>> > >>>>> generate > >>>>>>>>>> > >>>>>>>>>>> filenames. > >>>>>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe > >>>>>>>> > >>>>>>> sult.waitUntilFinish(DirectRunner.java:322) > >>>>>>> > >>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe > >>>>>>>> > >>>>>>> sult.waitUntilFinish(DirectRunner.java:292) > >>>>>>> > >>>>>>>> at > >>>>>>>>>>> > >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run( > >>>>>>>> > >>>>>>> DirectRunner.java:200) > >>>> > >>>>> at > >>>>>>>>>>> > >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run( > >>>>>>>> > >>>>>>> DirectRunner.java:63) > >>>> > >>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) > >>>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) > >>>>>>>>>>> at Test.main(Test.java:50) > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Would it make sense to change TextIO so that it does not > >>>>>>>>>>> use DefaultFilenamePolicy only - but in case there are > >>>>>>>>>>> > >>>>>>>>>> windowedWrites > >>>>>>> > >>>>>>>> and > >>>>>>>> > >>>>>>>>> no filename policy was specified by user it could actually use > >>>>>>>>>>> > >>>>>>>>>> custom > >>>>>>> > >>>>>>>> FilePerWindow policy automatically. I believe today TextIO > >>>>>>>>>>> > >>>>>>>>>> always > >>> > >>>> expects > >>>>>>>> > >>>>>>>>> user to specify FilenamePolicy, right? > >>>>>>>>>>> > >>>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam > >>>>>>>>>>> > >>>>>>>>>> - I > >>> > >>>> believe > >>>>>>>> > >>>>>>>>> today there are only implementations in tests and examples but > >>>>>>>>>>> > >>>>>>>>>> nothing > >>>>>>> > >>>>>>>> publicly visible, right? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> thanks > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> Jean-Baptiste Onofré > >>>>>>>>>> [email protected] > >>>>>>>>>> http://blog.nanthrax.net > >>>>>>>>>> Talend - http://www.talend.com > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> -- > >>>>>>>> Jean-Baptiste Onofré > >>>>>>>> [email protected] > >>>>>>>> http://blog.nanthrax.net > >>>>>>>> Talend - http://www.talend.com > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>> > >>> > >> > > -- > > Jean-Baptiste Onofré > > [email protected] > > http://blog.nanthrax.net > > Talend - http://www.talend.com > > >
