Hallo guys,
I am using hadoop-0.19.1 and hbase-0.19.3 on a cluster of four nodes. I have
created an hbase database composed of one FamilyColumn 'cf' and several
columns ''cf:Value cf:SensorType cf:SensorID .....'. The table has in total
100 records. The programs searches only for SensorType=temperature and tries
to calculate the average of all the temperature readings. The Job is exactly
divided as follows
*Mapper:*
The mapper scans each record in the table, whenever it finds a
SensorType=temperature it executes two steps: First, takes the value of the
reading and sums it with the previously found values, The variable
"sumreadings" contains the sum of the values. Second, the mapper increments
a counter called "numreadings". When there is no more records in the table,
it concatenates in a Text the value of "sumreadings" together with the value
"numreadings" and passes the Text result to the outputcollector.
*Reducer:*
the reducer execute the average by summing all the received "sumreadings"
and dividing the result with the sum of the "numreadings".
*A snapshot of the Mapper :*
*public void map(ImmutableBytesWritable key,RowResult value,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
double numberreadings=0;
double sumreadings=0;
if(table==null)
throw new IOException("table is null");
//set a scanner
Scanner scanner=table.getScanner(new String[] {"cf:Value",
"cf:Type", "cf:TimeStamp", "cf:Latitude", "cf:Longitude", "cf:SensorNode"});
RowResult rowresult=scanner.next();
//scanning the table, filtering out the values, and count them
while(rowresult!=null){
String stringtype= new
String((rowresult.get(Bytes.toBytes("cf:Type"))).getValue());
if((stringtype).equals("temperature")==true)
///summ the correct reading value
{String stringval=new
String((rowresult.get(Bytes.toBytes("cf:Value"))).getValue());
double doubleval=Double.parseDouble(stringval.trim());
sumreadings=sumreadings+doubleval;
///summ the number of readings
numberreadings=numberreadings+1;
}
rowresult=scanner.next();
}
scanner.close();
//send the summ of the values as well as the number
String strsumreadings=Double.toString(sumreadings);
String strnumberreadings=Double.toString(numberreadings);
String strmapoutvalue= strsumreadings+" "+strnumberreadings;
mapoutputvalue.set(strmapoutvalue);
output.collect(mapoutputkey,mapoutputvalue);
}*
*Questions:*
1-For 100 records, I noticed that I have 1 map task and 1 reduce task, and
the job finishes after 12 Sec. Whenever I extend the number of records in
the htable to 10,000 I still have 1 map and 1 reduce task and the job
finishes after 1 hour!!!!!!
The mapper is incredibly slow, what is so heavy in my code?
2-I have checked the UI from /http:IP:50030/, I noticed that :
Map Input Records: Map=100 Reduce=0
Map Output Records: Map=100 Reduce=0
Map Output bytes:Map=2,500 and Reduce=0
Since I am calling the collect() on the map's OutputCollector only once
as you see from the snapshot of the code, how come
the Map Output Records=100??? shouldn't be 1?
Thank you for your help,
CJ
------------------------------
------------------------------
------------------------------
Go back to JobTracker <http://134.130.223.85:50030/jobtracker.jsp>
------------------------------
Hadoop <http://hadoop.apache.org/core>, 2009.