PHOENIX-1973 Improve CsvBulkLoadTool performance by moving keyvalue construction from map phase to reduce phase(Sergey Soldatov)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e797b36c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e797b36c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e797b36c Branch: refs/heads/calcite Commit: e797b36c2ce42e9b9fd6b37fd8b9f79f79d6f18f Parents: 60ef7cd Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Tue Feb 16 12:12:23 2016 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Tue Feb 16 12:12:23 2016 +0530 ---------------------------------------------------------------------- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 6 +- .../mapreduce/FormatToKeyValueMapper.java | 164 ++++++++++++++++--- .../mapreduce/FormatToKeyValueReducer.java | 127 ++++++++++++-- .../bulkload/TargetTableRefFunctions.java | 22 ++- 4 files changed, 281 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e797b36c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index 39ee4b1..ab2848f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -268,7 +269,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { job.setInputFormatClass(TextInputFormat.class); job.setMapOutputKeyClass(TableRowkeyPair.class); - job.setMapOutputValueClass(KeyValue.class); + job.setMapOutputValueClass(ImmutableBytesWritable.class); job.setOutputKeyClass(TableRowkeyPair.class); job.setOutputValueClass(KeyValue.class); job.setReducerClass(FormatToKeyValueReducer.class); @@ -276,7 +277,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded); final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded); + final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAN_NAMES_TO_JSON.apply(tablesToBeLoaded); + job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson); + job.getConfiguration().set(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson); // give subclasses their hook setupJob(job); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e797b36c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java index 7e115e5..95b099e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java @@ -17,30 +17,30 @@ */ package org.apache.phoenix.mapreduce; +import java.io.DataOutputStream; import java.io.IOException; import java.sql.SQLException; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Mapper; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; -import org.apache.phoenix.util.ColumnInfo; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.QueryUtil; -import org.apache.phoenix.util.UpsertExecutor; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +59,7 @@ import com.google.common.collect.Lists; * to retrieve {@link ColumnInfo} from the target table. */ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair, - KeyValue> { + ImmutableBytesWritable> { protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueMapper.class); @@ -79,6 +79,7 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable /** Configuration key for the table names */ public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames"; + public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames"; /** Configuration key for the table configurations */ public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config"; @@ -94,8 +95,14 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable protected UpsertExecutor<RECORD, ?> upsertExecutor; protected ImportPreUpsertKeyValueProcessor preUpdateProcessor; protected List<String> tableNames; + protected List<String> logicalNames; protected MapperUpsertListener<RECORD> upsertListener; + /* + lookup table for column index. Index in the List matches to the index in tableNames List + */ + protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes; + protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf); protected abstract LineParser<RECORD> getLineParser(); @@ -112,13 +119,17 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable try { conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf); + + final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY); + final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY); + tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); + logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf); + + columnIndexes = initColumnIndexes(); } catch (SQLException | ClassNotFoundException e) { throw new RuntimeException(e); } - final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY); - tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); - upsertListener = new MapperUpsertListener<RECORD>( context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true)); upsertExecutor = buildUpsertExecutor(conf); @@ -127,7 +138,8 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable @SuppressWarnings("deprecation") @Override - protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + protected void map(LongWritable key, Text value, Context context) throws IOException, + InterruptedException { if (conn == null) { throw new RuntimeException("Connection not initialized."); } @@ -145,7 +157,7 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable return; } upsertExecutor.execute(ImmutableList.<RECORD>of(record)); - + Map<Integer, List<KeyValue>> map = new HashMap<>(); Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(conn, true); while (uncommittedDataIterator.hasNext()) { @@ -153,24 +165,125 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable List<KeyValue> keyValueList = kvPair.getSecond(); keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList); byte[] first = kvPair.getFirst(); - for (String tableName : tableNames) { - if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) { - // skip edits for other tables - continue; - } - for (KeyValue kv : keyValueList) { - ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); - outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); - context.write(new TableRowkeyPair(tableName, outputKey), kv); + // Create a list of KV for each table + for (int i = 0; i < tableNames.size(); i++) { + if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) { + if (!map.containsKey(i)) { + map.put(i, new ArrayList<KeyValue>()); + } + List<KeyValue> list = map.get(i); + for (KeyValue kv : keyValueList) { + list.add(kv); + } + break; } } } + for(Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) { + int tableIndex = rowEntry.getKey(); + List<KeyValue> lkv = rowEntry.getValue(); + // All KV values combines to a single byte array + writeAggregatedRow(context, tableIndex, lkv); + } conn.rollback(); } catch (Exception e) { throw new RuntimeException(e); } } + private List<Map<byte[],Map<byte[], Integer>>> initColumnIndexes() throws SQLException { + List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>(); + int tableIndex; + for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) { + PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex)); + Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + List<PColumn> cls = table.getColumns(); + for(int i = 0; i < cls.size(); i++) { + PColumn c = cls.get(i); + if(c.getFamilyName() == null) continue; // Skip PK column + byte[] family = c.getFamilyName().getBytes(); + byte[] name = c.getName().getBytes(); + if(!columnMap.containsKey(family)) { + columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR)); + } + Map<byte[], Integer> qualifier = columnMap.get(family); + qualifier.put(name, i); + } + tableMap.add(columnMap); + } + return tableMap; + } + + /** + * Find the column index which will replace the column name in + * the aggregated array and will be restored in Reducer + * + * @param tableIndex Table index in tableNames list + * @param cell KeyValue for the column + * @return column index for the specified cell or -1 if was not found + */ + private int findIndex(int tableIndex, Cell cell) { + Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex); + Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(), + cell.getFamilyOffset(), cell.getFamilyLength())); + if(qualifiers!= null) { + Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength())); + if(result!=null) { + return result; + } + } + return -1; + } + + /** + * Collect all column values for the same rowKey + * + * @param context Current mapper context + * @param tableIndex Table index in tableNames list + * @param lkv List of KV values that will be combined in a single ImmutableBytesWritable + * @throws IOException + * @throws InterruptedException + */ + + private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv) + throws IOException, InterruptedException { + TrustedByteArrayOutputStream bos = new TrustedByteArrayOutputStream(1024); + DataOutputStream outputStream = new DataOutputStream(bos); + ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); + if (!lkv.isEmpty()) { + // All Key Values for the same row are supposed to be the same, so init rowKey only once + Cell first = lkv.get(0); + outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength()); + for (KeyValue cell : lkv) { + if(isEmptyCell(cell)) { + continue; + } + int i = findIndex(tableIndex, cell); + if (i == -1) { + throw new IOException("No column found for KeyValue"); + } + WritableUtils.writeVInt(outputStream, i); + outputStream.writeByte(cell.getTypeByte()); + WritableUtils.writeVInt(outputStream, cell.getValueLength()); + outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + } + } + ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray()); + outputStream.close(); + context.write(new TableRowkeyPair(Integer.toString(tableIndex), outputKey), aggregatedArray); + } + + protected boolean isEmptyCell(KeyValue cell) { + if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0, + QueryConstants.EMPTY_COLUMN_BYTES.length) != 0) + return false; + else + return true; + } + + @Override protected void cleanup(Context context) throws IOException, InterruptedException { try { @@ -223,11 +336,12 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable @VisibleForTesting static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> { - private final Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context; + private final Mapper<LongWritable, Text, + TableRowkeyPair, ImmutableBytesWritable>.Context context; private final boolean ignoreRecordErrors; private MapperUpsertListener( - Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context, + Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable>.Context context, boolean ignoreRecordErrors) { this.context = context; this.ignoreRecordErrors = ignoreRecordErrors; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e797b36c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java index 5d00656..0f90e45 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java @@ -17,36 +17,143 @@ */ package org.apache.phoenix.mapreduce; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; -import java.util.TreeSet; +import java.sql.SQLException; +import java.util.*; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; +import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reducer class for the bulkload jobs. * Performs similar functionality to {@link KeyValueSortReducer} */ public class FormatToKeyValueReducer - extends Reducer<TableRowkeyPair,KeyValue,TableRowkeyPair,KeyValue> { + extends Reducer<TableRowkeyPair,ImmutableBytesWritable,TableRowkeyPair,KeyValue> { + + protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueReducer.class); + + + protected List<String> tableNames; + protected List<String> logicalNames; + protected KeyValueBuilder builder; + List<List<Pair<byte[], byte[]>>> columnIndexes; + List<ImmutableBytesPtr> emptyFamilyName; + + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + + // pass client configuration into driver + Properties clientInfos = new Properties(); + for (Map.Entry<String, String> entry : conf) { + clientInfos.setProperty(entry.getKey(), entry.getValue()); + } + + try { + PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf); + builder = conn.getKeyValueBuilder(); + final String tableNamesConf = conf.get(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY); + final String logicalNamesConf = conf.get(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY); + tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); + logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf); + + columnIndexes = new ArrayList<>(tableNames.size()); + emptyFamilyName = new ArrayList<>(); + initColumnsMap(conn); + } catch (SQLException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + private void initColumnsMap(PhoenixConnection conn) throws SQLException { + for (String tableName : logicalNames) { + PTable table = PhoenixRuntime.getTable(conn, tableName); + emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table)); + List<PColumn> cls = table.getColumns(); + List<Pair<byte[], byte[]>> list = new ArrayList(cls.size()); + for(int i = 0; i < cls.size(); i++) { + PColumn c = cls.get(i); + if(c.getFamilyName() == null) { + list.add(null); // Skip PK column + continue; + } + byte[] family = c.getFamilyName().getBytes(); + byte[] name = c.getName().getBytes(); + list.add(new Pair(family, name)); + } + columnIndexes.add(list); + } + + } @Override - protected void reduce(TableRowkeyPair key, Iterable<KeyValue> values, - Reducer<TableRowkeyPair, KeyValue, TableRowkeyPair, KeyValue>.Context context) + protected void reduce(TableRowkeyPair key, Iterable<ImmutableBytesWritable> values, + Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context) throws IOException, InterruptedException { TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR); - for (KeyValue kv: values) { - try { - map.add(kv.clone()); - } catch (CloneNotSupportedException e) { - throw new java.io.IOException(e); + int tableIndex = Integer.parseInt(key.getTableName()); + key.setTableName(tableNames.get(tableIndex)); + List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex); + for (ImmutableBytesWritable aggregatedArray : values) { + DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get())); + while (input.available()!= 0) { + int index = WritableUtils.readVInt(input); + Pair<byte[], byte[]> pair = columns.get(index); + byte type = input.readByte(); + ImmutableBytesWritable value = null; + int len = WritableUtils.readVInt(input); + if (len > 0) { + byte[] array = new byte[len]; + input.read(array); + value = new ImmutableBytesWritable(array); + } + KeyValue kv; + KeyValue.Type kvType = KeyValue.Type.codeToType(type); + switch (kvType) { + case Put: // not null value + kv = builder.buildPut(key.getRowkey(), + new ImmutableBytesWritable(pair.getFirst()), + new ImmutableBytesWritable(pair.getSecond()), value); + break; + case DeleteColumn: // null value + kv = builder.buildDeleteColumns(key.getRowkey(), + new ImmutableBytesWritable(pair.getFirst()), + new ImmutableBytesWritable(pair.getSecond())); + break; + default: + throw new IOException("Unsupported KeyValue type " + kvType); + } + map.add(kv); } + KeyValue empty = builder.buildPut(key.getRowkey(), + emptyFamilyName.get(tableIndex), + QueryConstants.EMPTY_COLUMN_BYTES_PTR, ByteUtil.EMPTY_BYTE_ARRAY_PTR); + map.add(empty); + Closeables.closeQuietly(input); } context.setStatus("Read " + map.getClass()); int index = 0; - for (KeyValue kv: map) { + for (KeyValue kv : map) { context.write(key, kv); if (++index % 100 == 0) context.setStatus("Wrote " + index); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e797b36c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java index d786842..e02065f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java @@ -78,7 +78,25 @@ public class TargetTableRefFunctions { } }; - public static Function<String,List<String>> NAMES_FROM_JSON = new Function<String,List<String>>() { + public static Function<List<TargetTableRef>,String> LOGICAN_NAMES_TO_JSON = new Function<List<TargetTableRef>,String>() { + + @Override + public String apply(List<TargetTableRef> input) { + try { + List<String> tableNames = Lists.newArrayListWithCapacity(input.size()); + for(TargetTableRef table : input) { + tableNames.add(table.getLogicalName()); + } + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(tableNames); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + }; + + public static Function<String,List<String>> NAMES_FROM_JSON = new Function<String,List<String>>() { @SuppressWarnings("unchecked") @Override @@ -92,4 +110,4 @@ public class TargetTableRefFunctions { } }; - } +}