Hi Sandeep, Are you using the Apex runner ? the default number of output shards is runner dependent.
You can set the number of write shards, see Write documentation: https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java#L75 Does it answer your question ? On Mon, Nov 28, 2016 at 2:22 PM Sandeep Deshmukh <sand...@datatorrent.com> wrote: > I am able to read and write from HDFS successfully. Here is my code > snippet. > > p.apply("ReadFromHDFS", > Read.from(HDFSFileSource.from(options.getInputFile(), > TextInputFormat.class, LongWritable.class, Text.class))) > .apply("ExtractPayload", ParDo.of(new ExtractString())) > .apply(new CountWords()) > .apply("WriteToHDFS", Write.to(new > HDFSFileSink(options.getOutput(), TextOutputFormat.class))); > > One observation is that the output is one file per word. How can I store > all the data into single file and avoid creation of one file per word? > > Regards, > Sandeep > > On Sat, Nov 26, 2016 at 9:45 AM, Sandeep Deshmukh <sand...@datatorrent.com > > wrote: > > Thanks. It helped. I faced serialisation issues, so moved the DoFn code > into a static class and it worked. > > Used following imports. > > import org.apache.hadoop.io.LongWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; > > At least now able to move on for input side. Would check for output too. > > Thanks & Regards, > Sandeep > > On Sat, Nov 26, 2016 at 1:35 AM, Ismaël Mejía <ieme...@gmail.com> wrote: > > Hello, > > I achieved this (reading a text file from HdfsSource) like this: > > PCollection<String> data = pipeline > .apply("ReadFromHDFS", Read.from( > HDFSFileSource.from(options.getInput(), > TextInputFormat.class, LongWritable.class, > Text.class)) > ) > .apply("ExtractPayload", ParDo.of(new > DoFn<KV<LongWritable, Text>, String>() { > @ProcessElement > public void processElement(ProcessContext c) throws > Exception { > c.output(c.element().getValue().toString()); > } > })); > > Probably there is a better way, but this one worked for me. Writing I > think it was easier, I think it was something like this: > > .apply("WriteToHDFS", Write.to(new > HDFSFileSink(options.getOutput(), TextOutputFormat.class)) > > Hope it helps, > Ismaël > > > On Fri, Nov 25, 2016 at 6:44 PM, Sandeep Deshmukh <sand...@datatorrent.com > > wrote: > > Hi, > > I am trying to user recently added support for Apex runner. This is to run > the program[1] using Apex on Hadoop cluster. The program is getting > launched successfully. > > I would like to change the input and output to HDFS. I looked at the > HDFSFileSource and planning to use the same. I would reading simple text > file from HDFS and same way writing to HDFS. > > I tried something like below, but looks like missing something trivial. > > Pipeline p = Pipeline.create(options); > HDFSFileSource<IntWritable, Text> source = > HDFSFileSource.from("filePath", > SequenceFileInputFormat.class, IntWritable.class, > Text.class); > > p.apply("ReadLines", source) > .apply(new CountWords()) > .apply(....) > > What would be the right format and <Key,Value> to use for this. > > [1] > https://github.com/tweise/apex-samples/blob/master/beam-apex-wordcount/src/main/java/com/example/myapexapp/Application.java > > Regards, > Sandeep > > > > >