DefaultFilenamePolicy as currently written only accepts a single shard name template. if that template is windowed, it won't work for unwindowed writes (it will have -WWW-PPP or .WWW.PPP or something.WWW.something.PPPP?). And vice versa on unwindowed templated for windowed writes.
There is likely a way to fix this more easily in the File IOs themselves. On Fri, May 12, 2017 at 4:33 AM, Jean-Baptiste Onofré <[email protected]> wrote: > Hi Reuven, > > yes, it's what I have in mind: just provide the default logic in the > existing method. > > Regards > JB > > > On 05/12/2017 12:46 PM, Reuven Lax wrote: > >> DefaultFilenamePolicy already contains a windowedFilename override (today >> it throws an exception), so I don't think there's any need for a new >> class. >> We can simply fill out the existing method. >> >> On May 12, 2017 11:34 AM, "Borisa Zivkovic" <[email protected]> >> wrote: >> >> +1 for DefaultFilenamePolicy being able to understand basic windowing... >> probably the most >> user-friendly way that would cover most of needs... in case of special >> needs users can provide their own policy.. >> >> another alternative would be to have new class called >> DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ... >> >> any of those would make it easier for Beam users.. >> >> so, someone needs to decide how we want to do this and if you want I can >> work on it... >> >> cheers >> >> On Fri, 12 May 2017 at 08:18 Reuven Lax <[email protected]> wrote: >> >> I believe that for most windows there is a standard stringification. >>> However I think we could allow the user to inject a window formatter for >>> cases where there is no good default (e.g. where the window is a >>> complicated user-defined type, and toString() isn't good enough. >>> >>> Alternatively, if we don't want allow formatters,, we could make >>> DefaultFilenamePolicy work with default stringifications of well-know >>> windows (fixed, sliding, sessions, etc.), and just use toString() for >>> remaining window types. Users that have weird custom window types can >>> always right their own FilenamePolicy. >>> >>> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw < >>> [email protected]> wrote: >>> >>> I like the idea of WWW and PPP, assuming there is a standard enough >>>> stringification of windows and panes. However, we may want to elide >>>> adjacent tokes if the window is global or the pane is the only >>>> possible (or first?) one to avoid writing things like >>>> -0000-of-0005---. >>>> >>>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <[email protected]> >>>> wrote: >>>> >>>>> Another idea - we can extend the existing pattern that >>>>> DefaultFileNamePolicy understands to include windows. >>>>> >>>>> Today it replaces SSS with the shard, and NNN with the number of >>>>> >>>> shards >> >>> (so >>>> >>>>> many templates contain -SSS-of-NNN). We could also have it recognize >>>>> >>>> WWW >>> >>>> and PPP, for the window and the pane respectively. >>>>> >>>>> I believe this would be a backwards-compatible change. We do not need >>>>> >>>> to >>> >>>> change any existing interfaces, we would simply be allowing the >>>>> >>>> default >> >>> policy to work on windows. >>>>> >>>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <[email protected]> >>>>> >>>> wrote: >>>> >>>>> >>>>> +Eugene, Reuven who reviewed and implemented this code. They may have >>>>>> opinions. >>>>>> >>>>>> Note that changing the default filename policy would be >>>>>> backwards-incompatible, so this would either need to go into 2.0.0 >>>>>> >>>>> (and >>> >>>> a >>>> >>>>> new RC3) or it would not go in. >>>>>> >>>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic < >>>>>> [email protected]> wrote: >>>>>> >>>>>> great JB, thanks >>>>>>> >>>>>>> I do not mind working on this - let's see if anyone else has >>>>>>> >>>>>> additional >>> >>>> input. >>>>>>> >>>>>>> cheers >>>>>>> >>>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>> Got it. >>>>>>>> >>>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default >>>>>>>> >>>>>>> and >>> >>>> let >>>>>>> >>>>>>>> the >>>>>>>> user provides its own policy if he wants to. >>>>>>>> >>>>>>>> Regards >>>>>>>> JB >>>>>>>> >>>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote: >>>>>>>> >>>>>>>>> Hi JB, >>>>>>>>> >>>>>>>>> yes I saw that thread - I also copied your code but did not want >>>>>>>>> >>>>>>>> to >>> >>>> pollute >>>>>>>> >>>>>>>>> it with my proposal :) >>>>>>>>> >>>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in >>>>>>>>> >>>>>>>> TextIO >>>>>>> >>>>>>>> does not make sense - not sure TBH... >>>>>>>>> >>>>>>>>> But would it make sense to promote a version of PerWindowFiles >>>>>>>>> >>>>>>>> from >>> >>>> >>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src >>>>>>>> >>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java >>>>>>> >>>>>>>> so that it is easier to provide some kind of PerWindowFiles >>>>>>>>> >>>>>>>> filename >>>> >>>>> policy.. >>>>>>>>> >>>>>>>>> >>>>>>>>> something like (where user does not have to write >>>>>>>>> >>>>>>>> PerWindowFilesPolicy, >>>>>>> >>>>>>>> it >>>>>>>> >>>>>>>>> comes with Beam) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix")) >>>>>>>>> .withWindowedWrites() >>>>>>>>> .withNumShards(1)); >>>>>>>>> >>>>>>>>> not sure if this was already discussed... >>>>>>>>> >>>>>>>>> cheers >>>>>>>>> Borisa >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré < >>>>>>>>> >>>>>>>> [email protected] >>> >>>> >>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Hi Borisa, >>>>>>>>>> >>>>>>>>>> You can take a look about the other thread ("Direct runner >>>>>>>>>> >>>>>>>>> doesn't >>> >>>> seem >>>>>>> >>>>>>>> to >>>>>>>> >>>>>>>>> finalize checkpoint "quickly""). >>>>>>>>>> >>>>>>>>>> It's basically the same point ;) >>>>>>>>>> >>>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm >>>>>>>>>> >>>>>>>>> investigating >>>>>>>> >>>>>>>>> the >>>>>>>>>> element timestamp and watermark. >>>>>>>>>> >>>>>>>>>> I'm also playing with that, for instance: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src >>>>>>>> >>>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java >>>>>>> >>>>>>>> >>>>>>>>>> When you use WindowedWrite, you have to provide a filename >>>>>>>>>> >>>>>>>>> policy. We >>>> >>>>> could >>>>>>>> >>>>>>>>> provide a default one, but not sure it will fit fine (as it >>>>>>>>>> >>>>>>>>> depends a >>>> >>>>> lot >>>>>>>> >>>>>>>>> about >>>>>>>>>> the use cases). >>>>>>>>>> >>>>>>>>>> Regards >>>>>>>>>> JB >>>>>>>>>> >>>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote: >>>>>>>>>> >>>>>>>>>>> Hi guys, >>>>>>>>>>> >>>>>>>>>>> just playing with reading data from PubSub and writing using >>>>>>>>>>> >>>>>>>>>> TextIO. >>>> >>>>> >>>>>>>>>>> First thing is that it is very hard to get any output - a lot >>>>>>>>>>> >>>>>>>>>> of >>> >>>> temp >>>>>>> >>>>>>>> files >>>>>>>>>> >>>>>>>>>>> written but not always would get final files created. >>>>>>>>>>> >>>>>>>>>>> So, I am playing with triggers etc... If I do following >>>>>>>>>>> >>>>>>>>>>> PCollection<String> streamData = p.apply( >>>>>>>>>>> PubsubIO.readStrings().fromTopic("projects/"+ >>>>>>>>>>> >>>>>>>>>> PROJECT_NAME >>>> >>>>> + >>>>>>> >>>>>>>> "/topics/myTopic")); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio >>>>>>>> >>>>>>> n.standardSeconds(5))) >>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst >>>>>>>> >>>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3)))) >>>>>>> >>>>>>>> .withAllowedLateness(Duration.ZERO) >>>>>>>>>>> .discardingFiredPanes()) >>>>>>>>>>> .apply(TextIO.write().to("/tmp/abc").withWindowedWrites() >>>>>>>>>>> .withSuffix(".suff").withNumShards(10)); >>>>>>>>>>> >>>>>>>>>>> p.run(); >>>>>>>>>>> >>>>>>>>>>> I would expect to see some files in /tmp/ with final results.. >>>>>>>>>>> >>>>>>>>>> unless I >>>>>>> >>>>>>>> add >>>>>>>>>> >>>>>>>>>>> good triggers I usually do not get any data.. only temp files >>>>>>>>>>> >>>>>>>>>> in >>> >>>> /temp/.beam/ >>>>>>>>>>> >>>>>>>>>>> but sometimes when data should be written I get following >>>>>>>>>>> >>>>>>>>>> exception >>>> >>>>> >>>>>>>>>>> Exception in thread "main" >>>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>>>>>>>>> java.lang.UnsupportedOperationException: There is no default >>>>>>>>>>> >>>>>>>>>> policy for >>>>>>> >>>>>>>> windowed file output. Please provide an explicit >>>>>>>>>>> >>>>>>>>>> FilenamePolicy >> >>> to >>>> >>>>> generate >>>>>>>>>> >>>>>>>>>>> filenames. >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe >>>>>>>> >>>>>>> sult.waitUntilFinish(DirectRunner.java:322) >>>>>>> >>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe >>>>>>>> >>>>>>> sult.waitUntilFinish(DirectRunner.java:292) >>>>>>> >>>>>>>> at >>>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run( >>>>>>>> >>>>>>> DirectRunner.java:200) >>>> >>>>> at >>>>>>>>>>> >>>>>>>>>> org.apache.beam.runners.direct.DirectRunner.run( >>>>>>>> >>>>>>> DirectRunner.java:63) >>>> >>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) >>>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) >>>>>>>>>>> at Test.main(Test.java:50) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Would it make sense to change TextIO so that it does not >>>>>>>>>>> use DefaultFilenamePolicy only - but in case there are >>>>>>>>>>> >>>>>>>>>> windowedWrites >>>>>>> >>>>>>>> and >>>>>>>> >>>>>>>>> no filename policy was specified by user it could actually use >>>>>>>>>>> >>>>>>>>>> custom >>>>>>> >>>>>>>> FilePerWindow policy automatically. I believe today TextIO >>>>>>>>>>> >>>>>>>>>> always >>> >>>> expects >>>>>>>> >>>>>>>>> user to specify FilenamePolicy, right? >>>>>>>>>>> >>>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam >>>>>>>>>>> >>>>>>>>>> - I >>> >>>> believe >>>>>>>> >>>>>>>>> today there are only implementations in tests and examples but >>>>>>>>>>> >>>>>>>>>> nothing >>>>>>> >>>>>>>> publicly visible, right? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> thanks >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>> [email protected] >>>>>>>>>> http://blog.nanthrax.net >>>>>>>>>> Talend - http://www.talend.com >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> -- >>>>>>>> Jean-Baptiste Onofré >>>>>>>> [email protected] >>>>>>>> http://blog.nanthrax.net >>>>>>>> Talend - http://www.talend.com >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>> >>> >> > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
