[ https://issues.apache.org/jira/browse/HBASE-8874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ted Yu updated HBASE-8874: -------------------------- Hadoop Flags: Reviewed Integrated to 0.95 and trunk. Thanks for the patch, Rajeshbabu. Thanks for the reviews. > PutCombiner is skipping KeyValues while combining puts of same row during > bulkload > ---------------------------------------------------------------------------------- > > Key: HBASE-8874 > URL: https://issues.apache.org/jira/browse/HBASE-8874 > Project: HBase > Issue Type: Bug > Components: mapreduce > Affects Versions: 0.95.0, 0.95.1 > Reporter: rajeshbabu > Assignee: rajeshbabu > Priority: Critical > Fix For: 0.98.0, 0.95.2 > > Attachments: HBASE-8874_trunk_2.patch, HBASE-8874_trunk_3.patch, > HBASE-8874_trunk.patch > > > While combining puts of same row in map phase we are using below logic in > PutCombiner#reduce. In for loop first time we will add one Put object to puts > map. Next time onwards we are just overriding key values of a family with key > values of the same family in other put. So we are mostly writing one Put > object to map output and remaining will be skipped(data loss). > {code} > Map<byte[], Put> puts = new TreeMap<byte[], Put>(Bytes.BYTES_COMPARATOR); > for (Put p : vals) { > cnt++; > if (!puts.containsKey(p.getRow())) { > puts.put(p.getRow(), p); > } else { > puts.get(p.getRow()).getFamilyMap().putAll(p.getFamilyMap()); > } > } > {code} > We need to change logic similar as below because we are sure the rowkey of > all the puts will be same. > {code} > Put finalPut = null; > Map<byte[], List<? extends Cell>> familyMap = null; > for (Put p : vals) { > cnt++; > if (finalPut==null) { > finalPut = p; > familyMap = finalPut.getFamilyMap(); > } else { > for (Entry<byte[], List<? extends Cell>> entry : > p.getFamilyMap().entrySet()) { > List<? extends Cell> list = familyMap.get(entry.getKey()); > if (list == null) { > familyMap.put(entry.getKey(), entry.getValue()); > } else { > (((List<KeyValue>)list)).addAll((List<KeyValue>)entry.getValue()); > } > } > } > } > context.write(row, finalPut); > {code} > Also need to implement TODOs mentioned by Nick > {code} > // TODO: would be better if we knew <code>K row</code> and Put rowkey were > // identical. Then this whole Put buffering business goes away. > // TODO: Could use HeapSize to create an upper bound on the memory size of > // the puts map and flush some portion of the content while looping. This > // flush could result in multiple Puts for a single rowkey. That is > // acceptable because Combiner is run as an optimization and it's not > // critical that all Puts are grouped perfectly. > {code} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira