Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 cd8f0535b -> bfd511827
PHOENIX-541 Make mutable batch size bytes-based instead of row-based (Geoffrey Jacoby) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bfd51182 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bfd51182 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bfd51182 Branch: refs/heads/4.x-HBase-0.98 Commit: bfd5118276c885f44980c3446451e39da238d250 Parents: cd8f053 Author: Samarth <[email protected]> Authored: Thu Jan 26 15:42:56 2017 -0800 Committer: Samarth <[email protected]> Committed: Thu Jan 26 15:42:56 2017 -0800 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/QueryMoreIT.java | 35 ++++++++++++++ .../end2end/UpsertSelectAutoCommitIT.java | 2 +- .../apache/phoenix/end2end/UpsertSelectIT.java | 2 +- .../phoenix/end2end/index/ImmutableIndexIT.java | 1 - .../org/apache/phoenix/tx/TransactionIT.java | 2 +- .../org/apache/phoenix/tx/TxCheckpointIT.java | 2 +- .../UngroupedAggregateRegionObserver.java | 31 +++++++----- .../apache/phoenix/execute/MutationState.java | 51 ++++++++++++++++++-- .../apache/phoenix/jdbc/PhoenixConnection.java | 10 +++- .../index/PhoenixIndexImportDirectMapper.java | 19 +++++--- .../org/apache/phoenix/query/QueryServices.java | 3 ++ .../phoenix/query/QueryServicesOptions.java | 7 ++- .../java/org/apache/phoenix/util/JDBCUtil.java | 6 +++ .../org/apache/phoenix/util/PhoenixRuntime.java | 12 ++++- .../apache/phoenix/jdbc/PhoenixDriverTest.java | 9 ++++ .../org/apache/phoenix/util/JDBCUtilTest.java | 19 ++++++++ 16 files changed, 181 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java index 2b27f00..a2dab16 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java @@ -37,6 +37,8 @@ import java.util.Properties; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.TestUtil; import org.junit.Test; @@ -471,4 +473,37 @@ public class QueryMoreIT extends ParallelStatsDisabledIT { assertFalse(rs.next()); } } + + @Test + public void testMutationBatch() throws Exception { + Properties connectionProperties = new Properties(); + connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "1024"); + PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); + String fullTableName = generateUniqueName(); + try (Statement stmt = connection.createStatement()) { + stmt.execute("CREATE TABLE " + fullTableName + "(\n" + + " ORGANIZATION_ID CHAR(15) NOT NULL,\n" + + " SCORE DOUBLE NOT NULL,\n" + + " ENTITY_ID CHAR(15) NOT NULL\n" + + " CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" + + " ORGANIZATION_ID,\n" + + " SCORE DESC,\n" + + " ENTITY_ID DESC\n" + + " )\n" + + ") MULTI_TENANT=TRUE"); + } + PreparedStatement stmt = connection.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score) values (?,?,?)"); + try { + for (int i = 0; i < 4; i++) { + stmt.setString(1, "AAAA" + i); + stmt.setString(2, "BBBB" + i); + stmt.setInt(3, 1); + stmt.execute(); + } + connection.commit(); + } catch (IllegalArgumentException expected) {} + + assertEquals(2L, connection.getMutationState().getBatchCount()); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java index 37482de..6b781a0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java @@ -152,7 +152,7 @@ public class UpsertSelectAutoCommitIT extends ParallelStatsDisabledIT { @Test public void testUpsertSelectDoesntSeeUpsertedData() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512)); props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); Connection conn = DriverManager.getConnection(getUrl(), props); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java index 763f11b..f5905ee 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java @@ -1435,7 +1435,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT { long ts = nextTimestamp(); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); - props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512)); props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); Connection conn = DriverManager.getConnection(getUrl(), props); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index 3ee9721..bc301fa 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -154,7 +154,6 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { return; } Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(100)); String tableName = "TBL_" + generateUniqueName(); String indexName = "IND_" + generateUniqueName(); String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 83128f1..bde5cc8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -603,7 +603,7 @@ public class TransactionIT extends ParallelStatsDisabledIT { @Test public void testParallelUpsertSelect() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512)); props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); Connection conn = DriverManager.getConnection(getUrl(), props); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java index 14bcd70..246ecd4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java @@ -80,7 +80,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { String seqName = "SEQ_" + generateUniqueName(); String fullTableName = SchemaUtil.getTableName(tableName, tableName); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512)); props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); Connection conn = getConnection(props); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 0d0f0c2..0d22ae0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; +import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB; import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT; import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT; @@ -70,6 +71,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.DataExceedsCapacityException; +import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; @@ -342,6 +344,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } int batchSize = 0; + long batchSizeBytes = 0L; List<Mutation> mutations = Collections.emptyList(); boolean needToWrite = false; Configuration conf = c.getEnvironment().getConfiguration(); @@ -368,6 +371,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // TODO: size better mutations = Lists.newArrayListWithExpectedSize(1024); batchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); + batchSizeBytes = env.getConfiguration().getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); } Aggregators aggregators = ServerAggregators.deserialize( scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), env.getConfiguration()); @@ -595,19 +600,23 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver mutations.add(put); } } - // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config - if (!mutations.isEmpty() && batchSize > 0 && - mutations.size() % batchSize == 0) { - commitBatch(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, - txState); - mutations.clear(); + // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config + List<List<Mutation>> batchMutationList = + MutationState.getMutationBatchList(batchSize, batchSizeBytes, mutations); + for (List<Mutation> batchMutations : batchMutationList) { + commitBatch(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, + txState); + batchMutations.clear(); } - // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config - if (!indexMutations.isEmpty() && batchSize > 0 && - indexMutations.size() % batchSize == 0) { - commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState); - indexMutations.clear(); + mutations.clear(); + // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config + List<List<Mutation>> batchIndexMutationList = + MutationState.getMutationBatchList(batchSize, batchSizeBytes, indexMutations); + for (List<Mutation> batchIndexMutations : batchIndexMutationList) { + commitBatch(region, batchIndexMutations, null, blockingMemStoreSize, null, txState); + batchIndexMutations.clear(); } + indexMutations.clear(); } aggregators.aggregate(rowAggregators, result); hasAny = true; http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index cb66968..6889ca4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -126,6 +126,8 @@ public class MutationState implements SQLCloseable { private final PhoenixConnection connection; private final long maxSize; + private final long maxSizeBytes; + private long batchCount = 0L; private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; private final List<TransactionAware> txAwares; private final TransactionContext txContext; @@ -140,7 +142,7 @@ public class MutationState implements SQLCloseable { private final MutationMetricQueue mutationMetricQueue; private ReadMetricQueue readMetricQueue; - + public MutationState(long maxSize, PhoenixConnection connection) { this(maxSize,connection, null, null); } @@ -171,6 +173,7 @@ public class MutationState implements SQLCloseable { Transaction tx, TransactionContext txContext) { this.maxSize = maxSize; this.connection = connection; + this.maxSizeBytes = connection.getMutateBatchSizeBytes(); this.mutations = mutations; boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() @@ -743,12 +746,11 @@ public class MutationState implements SQLCloseable { public static long getMutationTimestamp(final Long tableTimestamp, Long scn) { return (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn); } - + /** * Validates that the meta data is valid against the server meta data if we haven't yet done so. * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data * has changed. - * @param connection * @return the server time to use for the upsert * @throws SQLException if the table or any columns no longer exist */ @@ -842,6 +844,15 @@ public class MutationState implements SQLCloseable { } } } + + public long getMaxSizeBytes() { + return maxSizeBytes; + } + + public long getBatchCount() { + return batchCount; + } + private class MetaDataAwareHTable extends DelegateHTable { private final TableRef tableRef; @@ -1068,7 +1079,11 @@ public class MutationState implements SQLCloseable { long startTime = System.currentTimeMillis(); child.addTimelineAnnotation("Attempt " + retryCount); - hTable.batch(mutationList); + List<List<Mutation>> mutationBatchList = getMutationBatchList(maxSize, maxSizeBytes, mutationList); + for (List<Mutation> mutationBatch : mutationBatchList) { + hTable.batch(mutationBatch); + batchCount++; + } if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName)); child.stop(); child.stop(); @@ -1132,6 +1147,34 @@ public class MutationState implements SQLCloseable { } } + /** + * Split the list of mutations into multiple lists that don't exceed row and byte thresholds + * @param allMutationList List of HBase mutations + * @return List of lists of mutations + */ + public static List<List<Mutation>> getMutationBatchList(long maxSize, long maxSizeBytes, List<Mutation> allMutationList) { + List<List<Mutation>> mutationBatchList = Lists.newArrayList(); + List<Mutation> currentList = Lists.newArrayList(); + long currentBatchSizeBytes = 0L; + for (Mutation mutation : allMutationList) { + long mutationSizeBytes = mutation.heapSize(); + if (currentList.size() == maxSize || currentBatchSizeBytes + mutationSizeBytes > maxSizeBytes) { + if (currentList.size() > 0) { + mutationBatchList.add(currentList); + currentList = Lists.newArrayList(); + currentBatchSizeBytes = 0L; + } + } + currentList.add(mutation); + currentBatchSizeBytes += mutationSizeBytes; + } + if (currentList.size() > 0) { + mutationBatchList.add(currentList); + } + return mutationBatchList; + + } + public byte[] encodeTransaction() throws SQLException { try { return CODEC.encode(getTransaction()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index f465176..9f2f4fd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -140,6 +140,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private final Properties info; private final Map<PDataType<?>, Format> formatters = new HashMap<>(); private final int mutateBatchSize; + private final long mutateBatchSizeBytes; private final Long scn; private MutationState mutationState; private List<SQLCloseable> statements = new ArrayList<SQLCloseable>(); @@ -249,6 +250,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this.services.getProps().get(QueryServices.SCHEMA_ATTRIB, QueryServicesOptions.DEFAULT_SCHEMA)); this.tenantId = tenantId; this.mutateBatchSize = JDBCUtil.getMutateBatchSize(url, this.info, this.services.getProps()); + this.mutateBatchSizeBytes = JDBCUtil.getMutateBatchSizeBytes(url, this.info, this.services.getProps()); datePattern = this.services.getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT); timePattern = this.services.getProps().get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT); timestampPattern = this.services.getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT); @@ -437,7 +439,11 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea public int getMutateBatchSize() { return mutateBatchSize; } - + + public long getMutateBatchSizeBytes(){ + return mutateBatchSizeBytes; + } + public PMetaData getMetaDataCache() { return metaData; } @@ -451,7 +457,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } protected MutationState newMutationState(int maxSize) { - return new MutationState(maxSize, this); + return new MutationState(maxSize, this); } public MutationState getMutationState() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java index 15e55dd..c1db27c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java @@ -65,6 +65,7 @@ public class PhoenixIndexImportDirectMapper extends private DirectHTableWriter writer; private int batchSize; + private long batchSizeBytes; private MutationState mutationState; @@ -87,12 +88,16 @@ public class PhoenixIndexImportDirectMapper extends } connection = ConnectionUtil.getOutputConnection(configuration, overrideProps); connection.setAutoCommit(false); - // Get BatchSize + // Get BatchSize, which is in terms of rows ConnectionQueryServices services = ((PhoenixConnection) connection).getQueryServices(); int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); batchSize = Math.min(((PhoenixConnection) connection).getMutateBatchSize(), maxSize); + + //Get batch size in terms of bytes + batchSizeBytes = ((PhoenixConnection) connection).getMutateBatchSizeBytes(); + LOG.info("Mutation Batch Size = " + batchSize); final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); @@ -107,8 +112,6 @@ public class PhoenixIndexImportDirectMapper extends protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) throws IOException, InterruptedException { - context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); - try { final List<Object> values = record.getValues(); indxWritable.setValues(values); @@ -119,7 +122,6 @@ public class PhoenixIndexImportDirectMapper extends MutationState currentMutationState = pconn.getMutationState(); if (mutationState == null) { mutationState = currentMutationState; - return; } // Keep accumulating Mutations till batch size mutationState.join(currentMutationState); @@ -137,6 +139,7 @@ public class PhoenixIndexImportDirectMapper extends context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); throw new RuntimeException(e); } + context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); } private void writeBatch(MutationState mutationState, Context context) throws IOException, @@ -144,8 +147,12 @@ public class PhoenixIndexImportDirectMapper extends final Iterator<Pair<byte[], List<Mutation>>> iterator = mutationState.toMutations(true, null); while (iterator.hasNext()) { Pair<byte[], List<Mutation>> mutationPair = iterator.next(); - - writer.write(mutationPair.getSecond()); + List<Mutation> batchMutations = mutationPair.getSecond(); + List<List<Mutation>> batchOfBatchMutations = + MutationState.getMutationBatchList(batchSize, batchSizeBytes, batchMutations); + for (List<Mutation> mutationList : batchOfBatchMutations) { + writer.write(mutationList); + } context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment( mutationPair.getSecond().size()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index dc949ea..585d0ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -85,7 +85,10 @@ public interface QueryServices extends SQLCloseable { public static final String CALL_QUEUE_ROUND_ROBIN_ATTRIB = "ipc.server.callqueue.roundrobin"; public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching"; public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize"; + + @Deprecated //USE MUTATE_BATCH_SIZE_BYTES_ATTRIB instead public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize"; + public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes"; public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs"; @Deprecated // Use FORCE_ROW_KEY_ORDER instead. http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index a15009a..43436b8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -127,7 +127,10 @@ public class QueryServicesOptions { public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta data also. public static final long DEFAULT_DRIVER_SHUTDOWN_TIMEOUT_MS = 5 * 1000; // Time to wait in ShutdownHook to exit gracefully. - public final static int DEFAULT_MUTATE_BATCH_SIZE = 1000; // Batch size for UPSERT SELECT and DELETE + @Deprecated //use DEFAULT_MUTATE_BATCH_SIZE_BYTES + public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for UPSERT SELECT and DELETE + //Batch size in bytes for UPSERT, SELECT and DELETE. By default, 10MB + public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 134217728; // The only downside of it being out-of-sync is that the parallelization of the scan won't be as balanced as it could be. public static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 30000; // 30 sec (with no activity) public static final int DEFAULT_SCAN_CACHE_SIZE = 1000; @@ -445,6 +448,7 @@ public class QueryServicesOptions { return set(MAX_MUTATION_SIZE_ATTRIB, maxMutateSize); } + @Deprecated public QueryServicesOptions setMutateBatchSize(int mutateBatchSize) { return set(MUTATE_BATCH_SIZE_ATTRIB, mutateBatchSize); } @@ -509,6 +513,7 @@ public class QueryServicesOptions { return config.getInt(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE); } + @Deprecated public int getMutateBatchSize() { return config.getInt(MUTATE_BATCH_SIZE_ATTRIB, DEFAULT_MUTATE_BATCH_SIZE); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java index a0672d4..c081904 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java @@ -126,11 +126,17 @@ public class JDBCUtil { return (scnStr == null ? null : Long.parseLong(scnStr)); } + @Deprecated // use getMutateBatchSizeBytes public static int getMutateBatchSize(String url, Properties info, ReadOnlyProps props) throws SQLException { String batchSizeStr = findProperty(url, info, PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB); return (batchSizeStr == null ? props.getInt(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE) : Integer.parseInt(batchSizeStr)); } + public static long getMutateBatchSizeBytes(String url, Properties info, ReadOnlyProps props) throws SQLException { + String batchSizeStr = findProperty(url, info, PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB); + return (batchSizeStr == null ? props.getLong(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES) : Long.parseLong(batchSizeStr)); + } + public static @Nullable PName getTenantId(String url, Properties info) throws SQLException { String tenantId = findProperty(url, info, PhoenixRuntime.TENANT_ID_ATTRIB); return (tenantId == null ? null : PNameFactory.newName(tenantId)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 1d9f946..04eb061 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -147,6 +147,16 @@ public class PhoenixRuntime { public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize"; /** + * Use this connection property to control the number of bytes that are + * batched together on an UPSERT INTO table1... SELECT ... FROM table2. + * It's only used when autoCommit is true and your source table is + * different than your target table or your SELECT statement has a + * GROUP BY clause. Overrides the value of UpsertBatchSize. + */ + public final static String UPSERT_BATCH_SIZE_BYTES_ATTRIB = "UpsertBatchSizeBytes"; + + + /** * Use this connection property to explicitly enable or disable auto-commit on a new connection. */ public static final String AUTO_COMMIT_ATTRIB = "AutoCommit"; @@ -877,7 +887,7 @@ public class PhoenixRuntime { * Column names and family names are enclosed in double quotes to allow for case sensitivity and for presence of * special characters. Salting column and view index id column are not included. If the connection is tenant specific * and the table used by the query plan is multi-tenant, then the tenant id column is not included as well. - * @param datatypes - Initialized empty list to be filled with the corresponding data type for the columns in @param columns. + * @param dataTypes - Initialized empty list to be filled with the corresponding data type for the columns in @param columns. * @param plan - query plan to get info for * @param conn - phoenix connection used to generate the query plan. Caller should take care of closing the connection appropriately. * @param forDataTable - if true, then column names and data types correspond to the data table even if the query plan uses http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java index 4ab75a9..c87c2db 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java @@ -77,6 +77,15 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest { fail("Upsert should have failed since the number of upserts (200) is greater than the MAX_MUTATION_SIZE_ATTRIB (100)"); } catch (IllegalArgumentException expected) {} } + + @Test + public void testMaxMutationSizeInBytesSetCorrectly() throws Exception { + Properties connectionProperties = new Properties(); + connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB,"100"); + PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); + assertEquals(100L, connection.getMutateBatchSizeBytes()); + assertEquals(100L, connection.getMutationState().getMaxSizeBytes()); + } @Test public void testDisallowNegativeScn() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/bfd51182/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java index 8c270c3..241a120 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/JDBCUtilTest.java @@ -27,8 +27,11 @@ import static org.junit.Assert.assertTrue; import java.util.Map; import java.util.Properties; +import org.apache.phoenix.query.QueryServices; import org.junit.Test; +import com.google.common.collect.Maps; + public class JDBCUtilTest { @Test @@ -109,4 +112,20 @@ public class JDBCUtilTest { props.setProperty("AutoCommit", "false"); assertFalse(JDBCUtil.getAutoCommit("localhost", props, false)); } + + @Test + public void testGetMaxMutateBytes() throws Exception { + assertEquals(1000L, JDBCUtil.getMutateBatchSizeBytes("localhost;" + PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB + + "=1000", new Properties(), ReadOnlyProps.EMPTY_PROPS)); + + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB, "2000"); + assertEquals(2000L, JDBCUtil.getMutateBatchSizeBytes("localhost", props, ReadOnlyProps.EMPTY_PROPS)); + + Map<String, String> propMap = Maps.newHashMap(); + propMap.put(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "3000"); + ReadOnlyProps readOnlyProps = new ReadOnlyProps(propMap); + assertEquals(3000L, JDBCUtil.getMutateBatchSizeBytes("localhost", new Properties(), readOnlyProps)); + } + }
