"cf" in this example is a column family, and this needs to exist in the tables (both input and output) before the job is submitted.
On 8/26/13 3:01 PM, "jamal sasha" <jamalsha...@gmail.com> wrote: >Hi, > I am new to hbase, so few noob questions. > >So, I created a table in hbase: >A quick scan gives me the following: >hbase(main):001:0> scan 'test' >ROW COLUMN+CELL > > > row1 column=cf:word, >timestamp=1377298314160, value=foo > > row2 column=cf:word, >timestamp=1377298326124, value=bar > > row3 column=cf:word, >timestamp=1377298332856, value=bar foo > > row4 column=cf:word, >timestamp=1377298347602, value=bar world foo > >Now, I want to do the word count and write the result back to another >table >in hbase >So I followed the code given below: >http://hbase.apache.org/book.html#mapreduce >Snapshot in the end: >Now, I am getting an error > >java.lang.NullPointerException >at java.lang.String.<init>(String.java:601) >at org.rdf.HBaseExperiment$MyMapper.map(HBaseExperiment.java:42) >at org.rdf.HBaseExperiment$MyMapper.map(HBaseExperiment.java:1) >at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) >at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) >at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) >at org.apache.hadoop.mapred.Child$4.run(Child.java:255) >at java.security.AccessController.doPrivileged(Native Method) >at javax.security.auth.Subject.doAs(Subject.java:416) >at >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation. >java:1093) >at org.apache.hadoop.mapred.Child.main(Child.java:249) > >Line 42 points to >*public static final byte[] ATTR1 = "attr1".getBytes();* > >Now I think attr1 is family qualifier. >I am wondering, what exactly is a family qualifier? >Do I need to set something while creating a table just like I did "cf" >when >I was creating the table. >Similiarly what do I need to do on the "output" table as well? >So, what I am saying is.. what do I need to to on hbase shell so that I >can >run this word count example? >Thanks > > > > > >import java.io.IOException; >import java.util.Date; > >import org.apache.hadoop.conf.Configuration; >import org.apache.hadoop.fs.Path; >import org.apache.hadoop.hbase.HBaseConfiguration; >import org.apache.hadoop.hbase.KeyValue; >import org.apache.hadoop.hbase.client.Put; >import org.apache.hadoop.hbase.client.Result; >import org.apache.hadoop.hbase.client.Scan; >import org.apache.hadoop.hbase.io.ImmutableBytesWritable; >import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; >import org.apache.hadoop.hbase.mapreduce.TableMapper; >import org.apache.hadoop.hbase.mapreduce.TableReducer; >import org.apache.hadoop.hbase.util.Bytes; > >import org.apache.hadoop.io.IntWritable; >import org.apache.hadoop.io.Text; >import org.apache.hadoop.io.Writable; >import org.apache.hadoop.mapreduce.Job; >import org.apache.hadoop.mapreduce.Reducer; >import org.apache.hadoop.mapreduce.Reducer.Context; >import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; >import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; >import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; >import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; >import org.co_occurance.Pair; >import org.co_occurance.PairsMethod; >import org.co_occurance.PairsMethod.MeanReducer; >import org.co_occurance.PairsMethod.PairsMapper; > >public class HBaseExperiment { >public static class MyMapper extends TableMapper<Text, IntWritable> { >public static final byte[] CF = "cf".getBytes(); >*public static final byte[] ATTR1 = "attr1".getBytes();* > >private final IntWritable ONE = new IntWritable(1); > private Text text = new Text(); > > public void map(ImmutableBytesWritable row, Result value, Context >context) throws IOException, InterruptedException { > String val = new String(value.getValue(CF, ATTR1)); > //text.set(val); // we can only emit Writables... > text.set(value.toString()); > context.write(text, ONE); > } >} > public static class MyTableReducer extends TableReducer<Text, >IntWritable, >ImmutableBytesWritable> { >public static final byte[] CF = "cf".getBytes(); >public static final byte[] COUNT = "count".getBytes(); > > public void reduce(Text key, Iterable<IntWritable> values, Context >context) throws IOException, InterruptedException { > int i = 0; > for (IntWritable val : values) { > i += val.get(); > } > Put put = new Put(Bytes.toBytes(key.toString())); > put.add(CF, COUNT, Bytes.toBytes(i)); > > context.write(null, put); > } >} > > public static void main(String[] args) throws Exception { >Configuration config = HBaseConfiguration.create(); >Job job = new Job(config,"ExampleSummary"); >job.setJarByClass(HBaseExperiment.class); // class that contains >mapper >and reducer > >Scan scan = new Scan(); >scan.setCaching(500); // 1 is the default in Scan, which will be >bad >for MapReduce jobs >scan.setCacheBlocks(false); // don't set to true for MR jobs >// set other scan attrs > TableMapReduceUtil.initTableMapperJob( >"test", // input table >scan, // Scan instance to control CF and attribute selection >MyMapper.class, // mapper class >Text.class, // mapper output key >IntWritable.class, // mapper output value >job); >TableMapReduceUtil.initTableReducerJob( >"output", // output table >MyTableReducer.class, // reducer class >job); >job.setNumReduceTasks(1); // at least one, adjust as required > long start = new Date().getTime(); >boolean b = job.waitForCompletion(true); >if (!b) { >throw new IOException("error with job!"); >} > long end = new Date().getTime(); > System.out.println("Job took " + ((end-start)/1000) + " seconds" ); > > > } > >}