Hi guys,

One of key refactoring/new feature we bring in the first stable release is the "new" Beam filesystems.

I started to play with it on couple of use cases I have in beam-samples.

1/ TextIO.write() with unbounded PCollection (stream)

The first use case is the TextIO write with unbounded PCollection (good timing as we had a question yesterday about this on Slack).

I confirm that TextIO now supports unbounded PCollection. You have to create a Window and "flag" TextIO to use windowing.

Here's the code snippet:

pipeline

.apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
                .apply(MapElements.via(new SimpleFunction<JmsRecord, String>() {
                    public String apply(JmsRecord input) {
                        return input.getPayload();
                    }
                }))

.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
                .apply(TextIO.write()
                        .to("/home/jbonofre/demo/beam/output/uc2")
                        .withWindowedWrites()
                        .withNumShards(3));

Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the JMS ack to advance the watermark, it should not be auto but client ack). I'm preparing a PR for JmsIO about this.
However the "windowed" TextIO works fine.

2/ Beam HDFS filesystem

The other use case is to use the "new" Beam filesystem with TextIO, especially 
HDFS.

So, in my pipeline, I'm using:

        
.apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));

In my pom.xml, I define both Beam hadoop-file-system and hadoop-client 
dependencies:

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
            <version>0.7.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

Unfortunately, when starting the pipeline, I have:

Exception in thread "main" java.lang.IllegalStateException: Unable to find registrar for hdfs
        at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:427)
        at 
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:494)
at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
        at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
        at 
org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)

I gonna investigate tonight and I will let you know.

Regards
JB
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to