[hbase] TableInputFormat erroneously aggregates map values
----------------------------------------------------------
Key: HADOOP-2234
URL: https://issues.apache.org/jira/browse/HADOOP-2234
Project: Hadoop
Issue Type: Bug
Components: contrib/hbase
Reporter: stack
Priority: Minor
Edward Yoon reports the following phenomeon:
Given a table:
{code}
[21:38] <edward__> row1 a: <aa> b: <bb> a:ca <aa2>
[21:38] <edward__> row2 a: <aa3> b: <bb3>
[21:38] <edward__> row3 a: <aa4> b: <bb4>
{code}
This map code:
{code}
public void map(WritableComparable key, Writable value,
OutputCollector output, Reporter reporter) throws IOException {
if (m_collector.collector == null) {
m_collector.collector = output;
}
HStoreKey hKey = (HStoreKey) key;
MapWritable newValue = (MapWritable) value;
newValue.put(new Text("row:" + hKey.getRow().toString()), new
ImmutableBytesWritable(hKey.getRow().toString().getBytes()));
Map<Text, String> log = new HashMap<Text, String>();
for(Map.Entry<Writable, Writable> e : newValue.entrySet()) {
log.put(e.getKey(), e.getValue()); //abbreviation code.
}
LOG.info(log);
output.collect(hKey, newValue);
}
{code}
... produces the following.
{code}
07/11/20 14:07:53 INFO jvm.JvmMetrics: Initializing JVM Metrics with
processName=JobTracker, sessionId=
07/11/20 14:07:53 WARN mapred.JobClient: No job jar file set. User classes may
not be found. See JobConf(Class) or JobConf#setJar(String).
07/11/20 14:07:53 INFO mapred.MapTask: numReduceTasks: 1
07/11/20 14:07:53 INFO algebra.SortMap: {a:=aa, b:=bb, a:da=aa44, a:ca=aa2}
07/11/20 14:07:53 INFO algebra.SortMap: {a:=aa3, b:=bb3, a:da=aa44, a:ca=aa2}
07/11/20 14:07:53 INFO algebra.SortMap: {a:=aa4, b:=bb4, a:da=aa44, a:ca=aa2}
07/11/20 14:07:53 INFO mapred.LocalJobRunner:
07/11/20 14:07:53 INFO mapred.TaskRunner: Task 'map_0000' done.
07/11/20 14:07:53 INFO algebra.SortReduce: {a:=aa, b:=bb, a:da=aa44, a:ca=aa2}
07/11/20 14:07:53 INFO algebra.SortReduce: {a:=aa3, b:=bb3, a:da=aa44, a:ca=aa2}
07/11/20 14:07:53 INFO algebra.SortReduce: {a:=aa4, b:=bb4, a:da=aa44, a:ca=aa2}
07/11/20 14:07:53 INFO mapred.LocalJobRunner: reduce > reduce
07/11/20 14:07:53 INFO mapred.TaskRunner: Task 'reduce_9ji2mr' done.
{code}
Notice how content from the first row is present when you output the second and
third rows.
The problem is that in TIF, after calling scanner.next, it copies the
scanner.next value to the passed in MapWritable value (converting from TreeMap
to MapWritable). It resets the TreeMap passed to the scanner.next each time
but not the passed in MapWritable.
There is a similar problem in the reduce where the outputter is collecting
together values (see log above). Need to figure whats going on here. Below is
the reduce code:
{code}
[22:03] <edward__> while (values.hasNext()) {
[22:03] <edward__> MapWritable data = (MapWritable) values.next();
[22:03] <edward__> Map<String, String> log = new HashMap<String, String>();
[22:03] <edward__> for (Map.Entry<Writable, Writable> e : data.entrySet())
{
[22:03] <edward__> log.put(e.getKey().toString(), new
String(((ImmutableBytesWritable) e
[22:03] <edward__> .getValue()).get()));
[22:03] <edward__> }
[22:03] <edward__> LOG.info(log);
{code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.