Hi Reuven,

yes, it's what I have in mind: just provide the default logic in the existing method.

Regards
JB

On 05/12/2017 12:46 PM, Reuven Lax wrote:
DefaultFilenamePolicy already contains a windowedFilename override (today
it throws an exception), so I don't think there's any need for a new class.
We can simply fill out the existing method.

On May 12, 2017 11:34 AM, "Borisa Zivkovic" <[email protected]>
wrote:

+1 for DefaultFilenamePolicy being able to understand basic windowing...
probably the most
user-friendly way that would cover most of needs... in case of special
needs  users can provide their own policy..

another alternative would be to have new class called
DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ...

any of those would make it easier for Beam users..

so, someone needs to decide how we want to do this and if you want I can
work on it...

cheers

On Fri, 12 May 2017 at 08:18 Reuven Lax <[email protected]> wrote:

I believe that for most windows there is a standard stringification.
However I think we could allow the user to inject a window formatter for
cases where there is no good default (e.g. where the window is a
complicated user-defined type, and toString() isn't good enough.

Alternatively, if we don't want allow formatters,, we could make
DefaultFilenamePolicy work with default stringifications of well-know
windows (fixed, sliding, sessions, etc.), and just use toString() for
remaining window types. Users that have weird custom window types can
always right their own FilenamePolicy.

On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw <
[email protected]> wrote:

I like the idea of WWW and PPP, assuming there is a standard enough
stringification of windows and panes. However, we may want to elide
adjacent tokes if the window is global or the pane is the only
possible (or first?) one to avoid writing things like
-0000-of-0005---.

On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <[email protected]>
wrote:
Another idea - we can extend the existing pattern that
DefaultFileNamePolicy understands to include windows.

Today it replaces SSS with the shard, and NNN with the number of
shards
(so
many templates contain -SSS-of-NNN). We could also have it recognize
WWW
and PPP, for the window and the pane respectively.

I believe this would be a backwards-compatible change. We do not need
to
change any existing interfaces, we would simply be allowing the
default
policy to work on windows.

On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <[email protected]>
wrote:

+Eugene, Reuven who reviewed and implemented this code. They may have
opinions.

Note that changing the default filename policy would be
backwards-incompatible, so this would either need to go into 2.0.0
(and
a
new RC3) or it would not go in.

On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
[email protected]> wrote:

great JB, thanks

I do not mind working on this - let's see if anyone else has
additional
input.

cheers

On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]>
wrote:

Got it.

Yes, agree, I think the PerWindowFilesPolicy could be the default
and
let
the
user provides its own policy if he wants to.

Regards
JB

On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
Hi  JB,

yes I saw that thread - I also copied your code but did not want
to
pollute
it with my proposal :)

Well ok maybe default FilePerWindow policy for windowedWrites in
TextIO
does not make sense - not sure TBH...

But would it make sense to promote a version of PerWindowFiles
from

https://github.com/jbonofre/beam-samples/blob/master/iot/src
/main/java/org/apache/beam/samples/iot/JmsToHdfs.java
so that it is easier to provide some kind of PerWindowFiles
filename
policy..


something like (where user does not have to write
PerWindowFilesPolicy,
it
comes with Beam)



.withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
.withWindowedWrites()
.withNumShards(1));

not sure if this was already discussed...

cheers
Borisa


On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <
[email protected]

wrote:

Hi Borisa,

You can take a look about the other thread ("Direct runner
doesn't
seem
to
finalize checkpoint "quickly"").

It's basically the same point ;)

The default trigger (event-time) doesn't fire any data. I'm
investigating
the
element timestamp and watermark.

I'm also playing with that, for instance:



https://github.com/jbonofre/beam-samples/blob/master/iot/src
/main/java/org/apache/beam/samples/iot/JmsToHdfs.java

When you use WindowedWrite, you have to provide a filename
policy. We
could
provide a default one, but not sure it will fit fine (as it
depends a
lot
about
the use cases).

Regards
JB

On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
Hi guys,

just playing with reading data from PubSub and writing using
TextIO.

First thing is that it is very hard to get any output - a lot
of
temp
files
written but not always would get final files created.

So, I am playing with triggers etc... If I do following

PCollection<String> streamData = p.apply(
        PubsubIO.readStrings().fromTopic("projects/"+
PROJECT_NAME
+
"/topics/myTopic"));




streamData.apply(Window.<String>into(FixedWindows.of(Duratio
n.standardSeconds(5)))



.triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
            .withAllowedLateness(Duration.ZERO)
            .discardingFiredPanes())
    .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
    .withSuffix(".suff").withNumShards(10));

    p.run();

I would expect to see some files in /tmp/ with final results..
unless I
add
good triggers I usually do not get any data.. only temp files
in
/temp/.beam/

but sometimes when data should be written I get following
exception

Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.UnsupportedOperationException: There is no default
policy for
windowed file output. Please provide an explicit
FilenamePolicy
to
generate
filenames.
at


org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
sult.waitUntilFinish(DirectRunner.java:322)
at


org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
sult.waitUntilFinish(DirectRunner.java:292)
at
org.apache.beam.runners.direct.DirectRunner.run(
DirectRunner.java:200)
at
org.apache.beam.runners.direct.DirectRunner.run(
DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
at Test.main(Test.java:50)


Would it make sense to change TextIO so that it does not
use DefaultFilenamePolicy only - but in case there are
windowedWrites
and
no filename policy was specified by user it could actually use
custom
FilePerWindow policy automatically. I believe today TextIO
always
expects
user to specify FilenamePolicy, right?

Or maybe to have FilePerWindow policy exposed as part of Beam
- I
believe
today there are only implementations in tests and examples but
nothing
publicly visible, right?



thanks


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com



--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com








--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to