Higher memory consumption on RS leading to OOM/abort on immutable index creation with multiple regions on single RS
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8f6d02f7 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8f6d02f7 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8f6d02f7 Branch: refs/heads/calcite Commit: 8f6d02f79871eb8a2458ca7aecfb10b3ebf34e7b Parents: c8612fa Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Mon Mar 6 14:58:01 2017 +0530 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Mon Mar 6 14:58:01 2017 +0530 ---------------------------------------------------------------------- .../apache/phoenix/compile/UpsertCompiler.java | 20 ++++- .../UngroupedAggregateRegionObserver.java | 86 +++++++++++++------- .../apache/phoenix/schema/MetaDataClient.java | 60 ++++++++++++-- 3 files changed, 128 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 7a285a9..260e591 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -24,6 +24,7 @@ import java.sql.ParameterMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; import java.util.Collections; @@ -106,6 +107,7 @@ import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -756,6 +758,10 @@ public class UpsertCompiler { Tuple row = iterator.next(); final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr); + for (PTable index : getNewIndexes(table)) { + new MetaDataClient(connection).buildIndex(index, tableRef, + scan.getTimeRange().getMax(), scan.getTimeRange().getMax() + 1); + } return new MutationState(maxSize, connection) { @Override public long getUpdateCount() { @@ -767,7 +773,19 @@ public class UpsertCompiler { } } - + + private List<PTable> getNewIndexes(PTable table) throws SQLException { + List<PTable> indexes = table.getIndexes(); + List<PTable> newIndexes = new ArrayList<PTable>(2); + PTable newTable = PhoenixRuntime.getTableNoCache(connection, table.getName().getString()); + for (PTable index : newTable.getIndexes()) { + if (!indexes.contains(index)) { + newIndexes.add(index); + } + } + return newIndexes; + } + @Override public ExplainPlan getExplainPlan() throws SQLException { List<String> queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index c5854d3..2dec235 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -35,7 +35,6 @@ import java.security.PrivilegedExceptionAction; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; @@ -59,6 +58,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; @@ -72,7 +72,6 @@ import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.DataExceedsCapacityException; -import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; @@ -127,6 +126,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; /** @@ -288,6 +288,41 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver return s; } + class MutationList extends ArrayList<Mutation> implements HeapSize { + private long heapSize = 0l; + public MutationList() { + super(); + } + + public MutationList(int size){ + super(size); + } + + @Override + public boolean add(Mutation e) { + boolean r = super.add(e); + if (r) { + incrementHeapSize(e.heapSize()); + } + return r; + } + + @Override + public long heapSize() { + return heapSize; + } + + private void incrementHeapSize(long heapSize) { + this.heapSize += heapSize; + } + + @Override + public void clear() { + heapSize = 0l; + super.clear(); + } + } + @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException { RegionCoprocessorEnvironment env = c.getEnvironment(); @@ -339,7 +374,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); } List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto); - List<Mutation> indexMutations = localIndexBytes == null ? Collections.<Mutation>emptyList() : Lists.<Mutation>newArrayListWithExpectedSize(1024); + MutationList indexMutations = localIndexBytes == null ? new MutationList() : new MutationList(1024); RegionScanner theScanner = s; @@ -395,9 +430,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier); } - int batchSize = 0; - long batchSizeBytes = 0L; - List<Mutation> mutations = Collections.emptyList(); + int maxBatchSize = 0; + long maxBatchSizeBytes = 0L; + MutationList mutations = new MutationList(); boolean needToWrite = false; Configuration conf = c.getEnvironment().getConfiguration(); long flushSize = region.getTableDesc().getMemStoreFlushSize(); @@ -420,10 +455,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) { needToWrite = true; - // TODO: size better - mutations = Lists.newArrayListWithExpectedSize(1024); - batchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); - batchSizeBytes = env.getConfiguration().getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, + maxBatchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); + mutations = new MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10)); + maxBatchSizeBytes = env.getConfiguration().getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); } Aggregators aggregators = ServerAggregators.deserialize( @@ -666,22 +700,17 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver mutations.add(put); } } - // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config - List<List<Mutation>> batchMutationList = - MutationState.getMutationBatchList(batchSize, batchSizeBytes, mutations); - for (List<Mutation> batchMutations : batchMutationList) { - commit(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, - txState, areMutationInSameRegion, targetHTable, useIndexProto); - batchMutations.clear(); - } + } + if (readyToCommit(mutations, maxBatchSize, maxBatchSizeBytes)) { + commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState, + areMutationInSameRegion, targetHTable, useIndexProto); mutations.clear(); - // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config - List<List<Mutation>> batchIndexMutationList = - MutationState.getMutationBatchList(batchSize, batchSizeBytes, indexMutations); - for (List<Mutation> batchIndexMutations : batchIndexMutationList) { - commitBatch(region, batchIndexMutations, null, blockingMemStoreSize, null, txState, useIndexProto); - batchIndexMutations.clear(); - } + } + // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config + + if (readyToCommit(indexMutations, maxBatchSize, maxBatchSizeBytes)) { + commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState, + useIndexProto); indexMutations.clear(); } aggregators.aggregate(rowAggregators, result); @@ -774,10 +803,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver return false; } - private boolean readyToCommit(List<Mutation> mutations,int batchSize){ - return !mutations.isEmpty() && batchSize > 0 && - mutations.size() > batchSize; + private boolean readyToCommit(MutationList mutations, int maxBatchSize, long maxBatchSizeBytes) { + return !mutations.isEmpty() && (maxBatchSize > 0 && mutations.size() > maxBatchSize) + || (maxBatchSizeBytes > 0 && mutations.heapSize() > maxBatchSizeBytes); } + @Override public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, final InternalScanner scanner, final ScanType scanType) throws IOException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 262047c..f2820f2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1226,21 +1226,63 @@ public class MetaDataClient { } throw new IllegalStateException(); // impossible } + + /** + * For new mutations only should not be used if there are deletes done in the data table between start time and end + * time passed to the method. + */ + public MutationState buildIndex(PTable index, TableRef dataTableRef, long startTime, long EndTime) + throws SQLException { + boolean wasAutoCommit = connection.getAutoCommit(); + try { + AlterIndexStatement indexStatement = FACTORY + .alterIndex( + FACTORY.namedTable(null, + TableName.create(index.getSchemaName().getString(), + index.getTableName().getString())), + dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE); + alterIndex(indexStatement); + connection.setAutoCommit(true); + MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, dataTableRef); + Scan scan = mutationPlan.getContext().getScan(); + try { + scan.setTimeRange(startTime, EndTime); + } catch (IOException e) { + throw new SQLException(e); + } + MutationState state = connection.getQueryServices().updateData(mutationPlan); + indexStatement = FACTORY + .alterIndex( + FACTORY.namedTable(null, + TableName.create(index.getSchemaName().getString(), + index.getTableName().getString())), + dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE); + alterIndex(indexStatement); + return state; + } finally { + connection.setAutoCommit(wasAutoCommit); + } + } + + private MutationPlan getMutationPlanForBuildingIndex(PTable index, TableRef dataTableRef) throws SQLException { + MutationPlan mutationPlan; + if (index.getIndexType() == IndexType.LOCAL) { + PostLocalIndexDDLCompiler compiler = + new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef)); + mutationPlan = compiler.compile(index); + } else { + PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef); + mutationPlan = compiler.compile(index); + } + return mutationPlan; + } private MutationState buildIndex(PTable index, TableRef dataTableRef) throws SQLException { AlterIndexStatement indexStatement = null; boolean wasAutoCommit = connection.getAutoCommit(); try { connection.setAutoCommit(true); - MutationPlan mutationPlan; - if (index.getIndexType() == IndexType.LOCAL) { - PostLocalIndexDDLCompiler compiler = - new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef)); - mutationPlan = compiler.compile(index); - } else { - PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef); - mutationPlan = compiler.compile(index); - } + MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, dataTableRef); Scan scan = mutationPlan.getContext().getScan(); Long scn = connection.getSCN(); try {