Hello, I achieved this (reading a text file from HdfsSource) like this:
PCollection<String> data = pipeline .apply("ReadFromHDFS", Read.from( HDFSFileSource.from(options.getInput(), TextInputFormat.class, LongWritable.class, Text.class)) ) .apply("ExtractPayload", ParDo.of(new DoFn<KV<LongWritable, Text>, String>() { @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.element().getValue().toString()); } })); Probably there is a better way, but this one worked for me. Writing I think it was easier, I think it was something like this: .apply("WriteToHDFS", Write.to(new HDFSFileSink(options.getOutput(), TextOutputFormat.class)) Hope it helps, Ismaël On Fri, Nov 25, 2016 at 6:44 PM, Sandeep Deshmukh <sand...@datatorrent.com> wrote: > Hi, > > I am trying to user recently added support for Apex runner. This is to run > the program[1] using Apex on Hadoop cluster. The program is getting > launched successfully. > > I would like to change the input and output to HDFS. I looked at the > HDFSFileSource and planning to use the same. I would reading simple text > file from HDFS and same way writing to HDFS. > > I tried something like below, but looks like missing something trivial. > > Pipeline p = Pipeline.create(options); > HDFSFileSource<IntWritable, Text> source = > HDFSFileSource.from("filePath", > SequenceFileInputFormat.class, IntWritable.class, > Text.class); > > p.apply("ReadLines", source) > .apply(new CountWords()) > .apply(....) > > What would be the right format and <Key,Value> to use for this. > > [1] https://github.com/tweise/apex-samples/blob/master/beam- > apex-wordcount/src/main/java/com/example/myapexapp/Application.java > > Regards, > Sandeep >