Hi Borisa,

You can take a look about the other thread ("Direct runner doesn't seem to finalize checkpoint "quickly"").

It's basically the same point ;)

The default trigger (event-time) doesn't fire any data. I'm investigating the element timestamp and watermark.

I'm also playing with that, for instance:

https://github.com/jbonofre/beam-samples/blob/master/iot/src/main/java/org/apache/beam/samples/iot/JmsToHdfs.java

When you use WindowedWrite, you have to provide a filename policy. We could provide a default one, but not sure it will fit fine (as it depends a lot about the use cases).

Regards
JB

On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
Hi guys,

just playing with reading data from PubSub and writing using TextIO.

First thing is that it is very hard to get any output - a lot of temp files
written but not always would get final files created.

So, I am playing with triggers etc... If I do following

PCollection<String> streamData = p.apply(
        PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME +
"/topics/myTopic"));


streamData.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))

.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(3))))
            .withAllowedLateness(Duration.ZERO)
            .discardingFiredPanes())
    .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
    .withSuffix(".suff").withNumShards(10));

    p.run();

I would expect to see some files in /tmp/ with final results.. unless I add
good triggers I usually do not get any data.. only temp files in
/temp/.beam/

but sometimes when data should be written I get following exception

Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.UnsupportedOperationException: There is no default policy for
windowed file output. Please provide an explicit FilenamePolicy to generate
filenames.
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
at Test.main(Test.java:50)


Would it make sense to change TextIO so that it does not
use DefaultFilenamePolicy only - but in case there are windowedWrites and
no filename policy was specified by user it could actually use custom
FilePerWindow policy automatically. I believe today TextIO always expects
user to specify FilenamePolicy, right?

Or maybe to have FilePerWindow policy exposed as part of Beam - I believe
today there are only implementations in tests and examples but nothing
publicly visible, right?



thanks


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to