(we should probably throw an exception at construction time in the various
FileBasedSinks if you use WindowedWrites and the default filename policy
though, that's a no-brainer and it's backwards-compatible.)

On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <[email protected]> wrote:

> +Eugene, Reuven who reviewed and implemented this code. They may have
> opinions.
>
> Note that changing the default filename policy would be
> backwards-incompatible, so this would either need to go into 2.0.0 (and a
> new RC3) or it would not go in.
>
> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
> [email protected]> wrote:
>
>> great JB, thanks
>>
>> I do not mind working on this - let's see if anyone else has additional
>> input.
>>
>> cheers
>>
>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>> > Got it.
>> >
>> > Yes, agree, I think the PerWindowFilesPolicy could be the default and
>> let
>> > the
>> > user provides its own policy if he wants to.
>> >
>> > Regards
>> > JB
>> >
>> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
>> > > Hi  JB,
>> > >
>> > > yes I saw that thread - I also copied your code but did not want to
>> > pollute
>> > > it with my proposal :)
>> > >
>> > > Well ok maybe default FilePerWindow policy for windowedWrites in
>> TextIO
>> > > does not make sense - not sure TBH...
>> > >
>> > > But would it make sense to promote a version of PerWindowFiles from
>> > >
>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>> > > so that it is easier to provide some kind of PerWindowFiles filename
>> > > policy..
>> > >
>> > >
>> > > something like (where user does not have to write
>> PerWindowFilesPolicy,
>> > it
>> > > comes with Beam)
>> > >
>> > >
>> > >
>> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
>> > > .withWindowedWrites()
>> > > .withNumShards(1));
>> > >
>> > > not sure if this was already discussed...
>> > >
>> > > cheers
>> > > Borisa
>> > >
>> > >
>> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <[email protected]>
>> > wrote:
>> > >
>> > >> Hi Borisa,
>> > >>
>> > >> You can take a look about the other thread ("Direct runner doesn't
>> seem
>> > to
>> > >> finalize checkpoint "quickly"").
>> > >>
>> > >> It's basically the same point ;)
>> > >>
>> > >> The default trigger (event-time) doesn't fire any data. I'm
>> > investigating
>> > >> the
>> > >> element timestamp and watermark.
>> > >>
>> > >> I'm also playing with that, for instance:
>> > >>
>> > >>
>> > >>
>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>> > >>
>> > >> When you use WindowedWrite, you have to provide a filename policy. We
>> > could
>> > >> provide a default one, but not sure it will fit fine (as it depends a
>> > lot
>> > >> about
>> > >> the use cases).
>> > >>
>> > >> Regards
>> > >> JB
>> > >>
>> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
>> > >>> Hi guys,
>> > >>>
>> > >>> just playing with reading data from PubSub and writing using TextIO.
>> > >>>
>> > >>> First thing is that it is very hard to get any output - a lot of
>> temp
>> > >> files
>> > >>> written but not always would get final files created.
>> > >>>
>> > >>> So, I am playing with triggers etc... If I do following
>> > >>>
>> > >>> PCollection<String> streamData = p.apply(
>> > >>>         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME
>> +
>> > >>> "/topics/myTopic"));
>> > >>>
>> > >>>
>> > >>>
>> > >>
>> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio
>> n.standardSeconds(5)))
>> > >>>
>> > >>>
>> > >>
>> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
>> > >>>             .withAllowedLateness(Duration.ZERO)
>> > >>>             .discardingFiredPanes())
>> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
>> > >>>     .withSuffix(".suff").withNumShards(10));
>> > >>>
>> > >>>     p.run();
>> > >>>
>> > >>> I would expect to see some files in /tmp/ with final results..
>> unless I
>> > >> add
>> > >>> good triggers I usually do not get any data.. only temp files in
>> > >>> /temp/.beam/
>> > >>>
>> > >>> but sometimes when data should be written I get following exception
>> > >>>
>> > >>> Exception in thread "main"
>> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> > >>> java.lang.UnsupportedOperationException: There is no default
>> policy for
>> > >>> windowed file output. Please provide an explicit FilenamePolicy to
>> > >> generate
>> > >>> filenames.
>> > >>> at
>> > >>>
>> > >>
>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>> sult.waitUntilFinish(DirectRunner.java:322)
>> > >>> at
>> > >>>
>> > >>
>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>> sult.waitUntilFinish(DirectRunner.java:292)
>> > >>> at
>> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
>> > >>> at
>> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
>> > >>> at Test.main(Test.java:50)
>> > >>>
>> > >>>
>> > >>> Would it make sense to change TextIO so that it does not
>> > >>> use DefaultFilenamePolicy only - but in case there are
>> windowedWrites
>> > and
>> > >>> no filename policy was specified by user it could actually use
>> custom
>> > >>> FilePerWindow policy automatically. I believe today TextIO always
>> > expects
>> > >>> user to specify FilenamePolicy, right?
>> > >>>
>> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I
>> > believe
>> > >>> today there are only implementations in tests and examples but
>> nothing
>> > >>> publicly visible, right?
>> > >>>
>> > >>>
>> > >>>
>> > >>> thanks
>> > >>>
>> > >>
>> > >> --
>> > >> Jean-Baptiste Onofré
>> > >> [email protected]
>> > >> http://blog.nanthrax.net
>> > >> Talend - http://www.talend.com
>> > >>
>> > >
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > [email protected]
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>> >
>>
>
>

Reply via email to