PHOENIX-2292 Improve performance of direct HBase API index build (Ravi Kishore Valeti)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8958431b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8958431b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8958431b Branch: refs/heads/txn Commit: 8958431b379e76d14b30d1641803eab5c89957db Parents: 2e848f6 Author: Thomas D'Silva <[email protected]> Authored: Thu Oct 15 15:37:13 2015 -0700 Committer: Thomas D'Silva <[email protected]> Committed: Thu Oct 15 15:41:08 2015 -0700 ---------------------------------------------------------------------- .../mapreduce/index/DirectHTableWriter.java | 10 +-- .../index/PhoenixIndexImportDirectMapper.java | 92 +++++++++++++++----- 2 files changed, 73 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8958431b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java index c9512c2..d18fde9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java @@ -18,14 +18,13 @@ package org.apache.phoenix.mapreduce.index; import java.io.IOException; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,10 +94,9 @@ public class DirectHTableWriter { } } - public void write(Mutation mutation) throws IOException { - if (mutation instanceof Put) this.table.put(new Put((Put) mutation)); - else if (mutation instanceof Delete) this.table.delete(new Delete((Delete) mutation)); - else throw new IOException("Pass a Delete or a Put"); + public void write(List<Mutation> mutations) throws IOException, InterruptedException { + Object[] results = new Object[mutations.size()]; + table.batch(mutations, results); } protected Configuration getConf() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/8958431b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java index addbcae..32b66f1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java @@ -33,10 +33,14 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.PhoenixJobCounters; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; import org.slf4j.Logger; @@ -60,6 +64,10 @@ public class PhoenixIndexImportDirectMapper extends private DirectHTableWriter writer; + private int batchSize; + + private MutationState mutationState; + @Override protected void setup(final Context context) throws IOException, InterruptedException { super.setup(context); @@ -68,8 +76,7 @@ public class PhoenixIndexImportDirectMapper extends try { indxTblColumnMetadata = - PhoenixConfigurationUtil - .getUpsertColumnMetadataList(configuration); + PhoenixConfigurationUtil.getUpsertColumnMetadataList(configuration); indxWritable.setColumnMetadata(indxTblColumnMetadata); final Properties overrideProps = new Properties(); @@ -77,6 +84,14 @@ public class PhoenixIndexImportDirectMapper extends configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE)); connection = ConnectionUtil.getOutputConnection(configuration, overrideProps); connection.setAutoCommit(false); + // Get BatchSize + ConnectionQueryServices services = ((PhoenixConnection) connection).getQueryServices(); + int maxSize = + services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + batchSize = Math.min(((PhoenixConnection) connection).getMutateBatchSize(), maxSize); + LOG.info("Mutation Batch Size = " + batchSize); + final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); this.pStatement = connection.prepareStatement(upsertQuery); @@ -98,17 +113,22 @@ public class PhoenixIndexImportDirectMapper extends this.pStatement.execute(); final PhoenixConnection pconn = connection.unwrap(PhoenixConnection.class); - final Iterator<Pair<byte[], List<Mutation>>> iterator = - pconn.getMutationState().toMutations(true); + MutationState currentMutationState = pconn.getMutationState(); + if (mutationState == null) { + mutationState = currentMutationState; + return; + } + // Keep accumulating Mutations till batch size + mutationState.join(currentMutationState); - while (iterator.hasNext()) { - Pair<byte[], List<Mutation>> mutationPair = iterator.next(); - for (Mutation mutation : mutationPair.getSecond()) { - writer.write(mutation); - } - context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1); + // Write Mutation Batch + if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 0) { + writeBatch(mutationState, context); + mutationState = null; } - connection.rollback(); + + // Make sure progress is reported to Application Master. + context.progress(); } catch (SQLException e) { LOG.error(" Error {} while read/write of a record ", e.getMessage()); context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); @@ -116,21 +136,47 @@ public class PhoenixIndexImportDirectMapper extends } } + private void writeBatch(MutationState mutationState, Context context) throws IOException, + SQLException, InterruptedException { + final Iterator<Pair<byte[], List<Mutation>>> iterator = mutationState.toMutations(true); + while (iterator.hasNext()) { + Pair<byte[], List<Mutation>> mutationPair = iterator.next(); + + writer.write(mutationPair.getSecond()); + context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment( + mutationPair.getSecond().size()); + } + connection.rollback(); + } + @Override protected void cleanup(Context context) throws IOException, InterruptedException { - // We are writing some dummy key-value as map output here so that we commit only one - // output to reducer. - context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), - new IntWritable(0)); - super.cleanup(context); - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ", - e.getMessage()); + try { + // Write the last & final Mutation Batch + if (mutationState != null) { + writeBatch(mutationState, context); + } + // We are writing some dummy key-value as map output here so that we commit only one + // output to reducer. + context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), + new IntWritable(0)); + super.cleanup(context); + } catch (SQLException e) { + LOG.error(" Error {} while read/write of a record ", e.getMessage()); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + throw new RuntimeException(e); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ", + e.getMessage()); + } + } + if (writer != null) { + writer.close(); } } - writer.close(); } }
