Thanks Luke, Gonna try and I will let you know.
Regards JB On 05/04/2017 06:07 PM, Lukasz Cwik wrote:
JB, for your second point it seems as though you may not be setting the Hadoop configuration on HadoopFileSystemOptions. Also, I just merged https://github.com/apache/beam/pull/2890 which will auto detect Hadoop configuration based upon your HADOOP_CONF_DIR and YARN_CONF_DIR environment variables. On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <j...@nanthrax.net <mailto:j...@nanthrax.net>> wrote: Hi guys, One of key refactoring/new feature we bring in the first stable release is the "new" Beam filesystems. I started to play with it on couple of use cases I have in beam-samples. 1/ TextIO.write() with unbounded PCollection (stream) The first use case is the TextIO write with unbounded PCollection (good timing as we had a question yesterday about this on Slack). I confirm that TextIO now supports unbounded PCollection. You have to create a Window and "flag" TextIO to use windowing. Here's the code snippet: pipeline .apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM")) .apply(MapElements.via(new SimpleFunction<JmsRecord, String>() { public String apply(JmsRecord input) { return input.getPayload(); } })) .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))) .apply(TextIO.write() .to("/home/jbonofre/demo/beam/output/uc2") .withWindowedWrites() .withNumShards(3)); Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the JMS ack to advance the watermark, it should not be auto but client ack). I'm preparing a PR for JmsIO about this. However the "windowed" TextIO works fine. 2/ Beam HDFS filesystem The other use case is to use the "new" Beam filesystem with TextIO, especially HDFS. So, in my pipeline, I'm using: .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1")); In my pom.xml, I define both Beam hadoop-file-system and hadoop-client dependencies: <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId> <version>0.7.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> Unfortunately, when starting the pipeline, I have: Exception in thread "main" java.lang.IllegalStateException: Unable to find registrar for hdfs at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:427) at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:494) at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193) at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292) at org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39) I gonna investigate tonight and I will let you know. Regards JB -- Jean-Baptiste Onofré jbono...@apache.org <mailto:jbono...@apache.org> http://blog.nanthrax.net Talend - http://www.talend.com
-- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com