great JB, thanks

I do not mind working on this - let's see if anyone else has additional
input.

cheers

On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]> wrote:

> Got it.
>
> Yes, agree, I think the PerWindowFilesPolicy could be the default and let
> the
> user provides its own policy if he wants to.
>
> Regards
> JB
>
> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
> > Hi  JB,
> >
> > yes I saw that thread - I also copied your code but did not want to
> pollute
> > it with my proposal :)
> >
> > Well ok maybe default FilePerWindow policy for windowedWrites in TextIO
> > does not make sense - not sure TBH...
> >
> > But would it make sense to promote a version of PerWindowFiles from
> >
> https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> > so that it is easier to provide some kind of PerWindowFiles filename
> > policy..
> >
> >
> > something like (where user does not have to write PerWindowFilesPolicy,
> it
> > comes with Beam)
> >
> >
> >
> > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
> > .withWindowedWrites()
> > .withNumShards(1));
> >
> > not sure if this was already discussed...
> >
> > cheers
> > Borisa
> >
> >
> > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <[email protected]>
> wrote:
> >
> >> Hi Borisa,
> >>
> >> You can take a look about the other thread ("Direct runner doesn't seem
> to
> >> finalize checkpoint "quickly"").
> >>
> >> It's basically the same point ;)
> >>
> >> The default trigger (event-time) doesn't fire any data. I'm
> investigating
> >> the
> >> element timestamp and watermark.
> >>
> >> I'm also playing with that, for instance:
> >>
> >>
> >>
> https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
> >>
> >> When you use WindowedWrite, you have to provide a filename policy. We
> could
> >> provide a default one, but not sure it will fit fine (as it depends a
> lot
> >> about
> >> the use cases).
> >>
> >> Regards
> >> JB
> >>
> >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
> >>> Hi guys,
> >>>
> >>> just playing with reading data from PubSub and writing using TextIO.
> >>>
> >>> First thing is that it is very hard to get any output - a lot of temp
> >> files
> >>> written but not always would get final files created.
> >>>
> >>> So, I am playing with triggers etc... If I do following
> >>>
> >>> PCollection<String> streamData = p.apply(
> >>>         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME +
> >>> "/topics/myTopic"));
> >>>
> >>>
> >>>
> >>
> streamData.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
> >>>
> >>>
> >>
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3))))
> >>>             .withAllowedLateness(Duration.ZERO)
> >>>             .discardingFiredPanes())
> >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
> >>>     .withSuffix(".suff").withNumShards(10));
> >>>
> >>>     p.run();
> >>>
> >>> I would expect to see some files in /tmp/ with final results.. unless I
> >> add
> >>> good triggers I usually do not get any data.. only temp files in
> >>> /temp/.beam/
> >>>
> >>> but sometimes when data should be written I get following exception
> >>>
> >>> Exception in thread "main"
> >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >>> java.lang.UnsupportedOperationException: There is no default policy for
> >>> windowed file output. Please provide an explicit FilenamePolicy to
> >> generate
> >>> filenames.
> >>> at
> >>>
> >>
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
> >>> at
> >>>
> >>
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
> >>> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
> >>> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
> >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> >>> at Test.main(Test.java:50)
> >>>
> >>>
> >>> Would it make sense to change TextIO so that it does not
> >>> use DefaultFilenamePolicy only - but in case there are windowedWrites
> and
> >>> no filename policy was specified by user it could actually use custom
> >>> FilePerWindow policy automatically. I believe today TextIO always
> expects
> >>> user to specify FilenamePolicy, right?
> >>>
> >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I
> believe
> >>> today there are only implementations in tests and examples but nothing
> >>> publicly visible, right?
> >>>
> >>>
> >>>
> >>> thanks
> >>>
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> [email protected]
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >>
> >
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to