JB, the ConfigurationLocator is the default instance factory for the hdfsConfiguration so as long as HADOOP_CONF_DIR/YARN_CONF_DIR is correctly specified, you should only need to write: Pipeline pipeline = Pipeline.create(options); pipeline.apply(TextIO.write().to("hdfs://localhost/path"));
On Fri, May 5, 2017 at 6:23 AM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi guys, > > thanks Luke, I updated my pipeline like this: > > HadoopFileSystemOptions options = PipelineOptionsFactory.fromArg > s(args).withValidation().as(HadoopFileSystemOptions.class); > HadoopFileSystemOptions.ConfigurationLocator locator = new > HadoopFileSystemOptions.ConfigurationLocator(); > List<Configuration> configurations = locator.create(options); > Pipeline pipeline = Pipeline.create(options); > ... > pipeline.apply(TextIO.write().to("hdfs://localhost/path")); > > I defined HADOOP_CONF_DIR env variable pointing to the folder where I have > hdfs-site.xml and it works fine. > > I saw that the README.md is not up to date in hadoop-file-system, I'm > preparing a PR about that and I also preparing a quick documentation about > HDFS support. > > Regards > JB > > On 05/04/2017 06:07 PM, Lukasz Cwik wrote: > >> JB, for your second point it seems as though you may not be setting the >> Hadoop >> configuration on HadoopFileSystemOptions. >> Also, I just merged https://github.com/apache/beam/pull/2890 which will >> auto >> detect Hadoop configuration based upon your HADOOP_CONF_DIR and >> YARN_CONF_DIR >> environment variables. >> >> On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <j...@nanthrax.net >> <mailto:j...@nanthrax.net>> wrote: >> >> Hi guys, >> >> One of key refactoring/new feature we bring in the first stable >> release is >> the "new" Beam filesystems. >> >> I started to play with it on couple of use cases I have in >> beam-samples. >> >> 1/ TextIO.write() with unbounded PCollection (stream) >> >> The first use case is the TextIO write with unbounded PCollection >> (good >> timing as we had a question yesterday about this on Slack). >> >> I confirm that TextIO now supports unbounded PCollection. You have to >> create >> a Window and "flag" TextIO to use windowing. >> >> Here's the code snippet: >> >> pipeline >> >> .apply(JmsIO.read().withConnectionFactory(connectionFactory) >> .withQueue("BEAM")) >> .apply(MapElements.via(new SimpleFunction<JmsRecord, >> String>() { >> public String apply(JmsRecord input) { >> return input.getPayload(); >> } >> })) >> >> .apply(Window.<String>into(FixedWindows.of(Duration.standard >> Seconds(10)))) >> .apply(TextIO.write() >> .to("/home/jbonofre/demo/beam/output/uc2") >> .withWindowedWrites() >> .withNumShards(3)); >> >> Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses >> the >> JMS ack to advance the watermark, it should not be auto but client >> ack). I'm >> preparing a PR for JmsIO about this. >> However the "windowed" TextIO works fine. >> >> 2/ Beam HDFS filesystem >> >> The other use case is to use the "new" Beam filesystem with TextIO, >> especially HDFS. >> >> So, in my pipeline, I'm using: >> >> >> .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/ >> demo/beam/output/uc1")); >> >> In my pom.xml, I define both Beam hadoop-file-system and hadoop-client >> dependencies: >> >> <dependency> >> <groupId>org.apache.beam</groupId> >> <artifactId>beam-sdks-java-io- >> hadoop-file-system</artifactId> >> <version>0.7.0-SNAPSHOT</version> >> </dependency> >> <dependency> >> <groupId>org.apache.hadoop</groupId> >> <artifactId>hadoop-client</artifactId> >> <version>2.7.3</version> >> </dependency> >> >> Unfortunately, when starting the pipeline, I have: >> >> Exception in thread "main" java.lang.IllegalStateException: Unable >> to find >> registrar for hdfs >> at org.apache.beam.sdk.io >> <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInt >> ernal(FileSystems.java:427) >> at org.apache.beam.sdk.io >> <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource >> (FileSystems.java:494) >> at org.apache.beam.sdk.io >> <http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileR >> esourceIfPossible(FileBasedSink.java:193) >> at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292) >> at >> org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcT >> oHdfs.java:39) >> >> I gonna investigate tonight and I will let you know. >> >> Regards >> JB >> -- >> Jean-Baptiste Onofré >> jbono...@apache.org <mailto:jbono...@apache.org> >> http://blog.nanthrax.net >> Talend - http://www.talend.com >> >> >> > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >