Higher memory consumption on RS leading to OOM/abort on immutable index 
creation with multiple regions on single RS


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8f6d02f7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8f6d02f7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8f6d02f7

Branch: refs/heads/system-catalog
Commit: 8f6d02f79871eb8a2458ca7aecfb10b3ebf34e7b
Parents: c8612fa
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Mon Mar 6 14:58:01 2017 +0530
Committer: Ankit Singhal <ankitsingha...@gmail.com>
Committed: Mon Mar 6 14:58:01 2017 +0530

----------------------------------------------------------------------
 .../apache/phoenix/compile/UpsertCompiler.java  | 20 ++++-
 .../UngroupedAggregateRegionObserver.java       | 86 +++++++++++++-------
 .../apache/phoenix/schema/MetaDataClient.java   | 60 ++++++++++++--
 3 files changed, 128 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7a285a9..260e591 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -24,6 +24,7 @@ import java.sql.ParameterMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
@@ -106,6 +107,7 @@ import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -756,6 +758,10 @@ public class UpsertCompiler {
                                 Tuple row = iterator.next();
                                 final long mutationCount = 
(Long)aggProjector.getColumnProjector(0).getValue(row,
                                         PLong.INSTANCE, ptr);
+                                for (PTable index : getNewIndexes(table)) {
+                                    new 
MetaDataClient(connection).buildIndex(index, tableRef,
+                                            scan.getTimeRange().getMax(), 
scan.getTimeRange().getMax() + 1);
+                                }
                                 return new MutationState(maxSize, connection) {
                                     @Override
                                     public long getUpdateCount() {
@@ -767,7 +773,19 @@ public class UpsertCompiler {
                             }
                             
                         }
-    
+
+                        private List<PTable> getNewIndexes(PTable table) 
throws SQLException {
+                            List<PTable> indexes = table.getIndexes();
+                            List<PTable> newIndexes = new ArrayList<PTable>(2);
+                            PTable newTable = 
PhoenixRuntime.getTableNoCache(connection, table.getName().getString());
+                            for (PTable index : newTable.getIndexes()) {
+                                if (!indexes.contains(index)) {
+                                    newIndexes.add(index);
+                                }
+                            }
+                            return newIndexes;
+                        }
+
                         @Override
                         public ExplainPlan getExplainPlan() throws 
SQLException {
                             List<String> queryPlanSteps =  
aggPlan.getExplainPlan().getPlanSteps();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c5854d3..2dec235 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -35,7 +35,6 @@ import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -59,6 +58,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
@@ -72,7 +72,6 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
-import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
@@ -127,6 +126,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
 
 
 /**
@@ -288,6 +288,41 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         return s;
     }
 
+    class MutationList extends ArrayList<Mutation> implements HeapSize {
+        private long heapSize = 0l;
+        public MutationList() {
+            super();
+        }
+        
+        public MutationList(int size){
+            super(size);
+        }
+        
+        @Override
+        public boolean add(Mutation e) {
+            boolean r = super.add(e);
+            if (r) {
+                incrementHeapSize(e.heapSize());
+            }
+            return r;
+        }
+
+        @Override
+        public long heapSize() {
+            return heapSize;
+        }
+
+        private void incrementHeapSize(long heapSize) {
+            this.heapSize += heapSize;
+        }
+
+        @Override
+        public void clear() {
+            heapSize = 0l;
+            super.clear();
+        }
+    }
+    
     @Override
     protected RegionScanner doPostScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final 
RegionScanner s) throws IOException, SQLException {
         RegionCoprocessorEnvironment env = c.getEnvironment();
@@ -339,7 +374,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
         }
         List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes, useProto);
-        List<Mutation> indexMutations = localIndexBytes == null ? 
Collections.<Mutation>emptyList() : 
Lists.<Mutation>newArrayListWithExpectedSize(1024);
+        MutationList indexMutations = localIndexBytes == null ? new 
MutationList() : new MutationList(1024);
         
         RegionScanner theScanner = s;
         
@@ -395,9 +430,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             theScanner = new HashJoinRegionScanner(theScanner, p, j, 
ScanUtil.getTenantId(scan), env, useQualifierAsIndex, 
useNewValueColumnQualifier);
         }
         
-        int batchSize = 0;
-        long batchSizeBytes = 0L;
-        List<Mutation> mutations = Collections.emptyList();
+        int maxBatchSize = 0;
+        long maxBatchSizeBytes = 0L;
+        MutationList mutations = new MutationList();
         boolean needToWrite = false;
         Configuration conf = c.getEnvironment().getConfiguration();
         long flushSize = region.getTableDesc().getMemStoreFlushSize();
@@ -420,10 +455,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         boolean buildLocalIndex = indexMaintainers != null && 
dataColumns==null && !localIndexScan;
         if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != 
null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
             needToWrite = true;
-            // TODO: size better
-            mutations = Lists.newArrayListWithExpectedSize(1024);
-            batchSize = 
env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-            batchSizeBytes = 
env.getConfiguration().getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+            maxBatchSize = 
env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            mutations = new MutationList(Ints.saturatedCast(maxBatchSize + 
maxBatchSize / 10));
+            maxBatchSizeBytes = 
env.getConfiguration().getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
                 QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
         }
         Aggregators aggregators = ServerAggregators.deserialize(
@@ -666,22 +700,17 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     mutations.add(put);
                                 }
                             }
-                            // Commit in batches based on 
UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
-                            List<List<Mutation>> batchMutationList =
-                                MutationState.getMutationBatchList(batchSize, 
batchSizeBytes, mutations);
-                            for (List<Mutation> batchMutations : 
batchMutationList) {
-                                commit(region, batchMutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr,
-                                        txState, areMutationInSameRegion, 
targetHTable, useIndexProto);
-                                batchMutations.clear();
-                            }
+                        }
+                        if (readyToCommit(mutations, maxBatchSize, 
maxBatchSizeBytes)) {
+                            commit(region, mutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr, txState,
+                                    areMutationInSameRegion, targetHTable, 
useIndexProto);
                             mutations.clear();
-                            // Commit in batches based on 
UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
-                            List<List<Mutation>> batchIndexMutationList =
-                                MutationState.getMutationBatchList(batchSize, 
batchSizeBytes, indexMutations);
-                            for (List<Mutation> batchIndexMutations : 
batchIndexMutationList) {
-                                commitBatch(region, batchIndexMutations, null, 
blockingMemStoreSize, null, txState, useIndexProto);
-                                batchIndexMutations.clear();
-                            }
+                        }
+                        // Commit in batches based on 
UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
+
+                        if (readyToCommit(indexMutations, maxBatchSize, 
maxBatchSizeBytes)) {
+                            commitBatch(region, indexMutations, null, 
blockingMemStoreSize, null, txState,
+                                    useIndexProto);
                             indexMutations.clear();
                         }
                         aggregators.aggregate(rowAggregators, result);
@@ -774,10 +803,11 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         return false;
     }
 
-    private boolean readyToCommit(List<Mutation> mutations,int batchSize){
-        return !mutations.isEmpty() && batchSize > 0 &&
-        mutations.size() > batchSize;
+    private boolean readyToCommit(MutationList mutations, int maxBatchSize, 
long maxBatchSizeBytes) {
+        return !mutations.isEmpty() && (maxBatchSize > 0 && mutations.size() > 
maxBatchSize)
+                || (maxBatchSizeBytes > 0 && mutations.heapSize() > 
maxBatchSizeBytes);
     }
+
     @Override
     public InternalScanner preCompact(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
             final InternalScanner scanner, final ScanType scanType) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f6d02f7/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 262047c..f2820f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1226,21 +1226,63 @@ public class MetaDataClient {
         }
         throw new IllegalStateException(); // impossible
     }
+    
+    /**
+     * For new mutations only should not be used if there are deletes done in 
the data table between start time and end
+     * time passed to the method.
+     */
+    public MutationState buildIndex(PTable index, TableRef dataTableRef, long 
startTime, long EndTime)
+            throws SQLException {
+        boolean wasAutoCommit = connection.getAutoCommit();
+        try {
+            AlterIndexStatement indexStatement = FACTORY
+                    .alterIndex(
+                            FACTORY.namedTable(null,
+                                    
TableName.create(index.getSchemaName().getString(),
+                                            index.getTableName().getString())),
+                            
dataTableRef.getTable().getTableName().getString(), false, 
PIndexState.INACTIVE);
+            alterIndex(indexStatement);
+            connection.setAutoCommit(true);
+            MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, 
dataTableRef);
+            Scan scan = mutationPlan.getContext().getScan();
+            try {
+                scan.setTimeRange(startTime, EndTime);
+            } catch (IOException e) {
+                throw new SQLException(e);
+            }
+            MutationState state = 
connection.getQueryServices().updateData(mutationPlan);
+            indexStatement = FACTORY
+                    .alterIndex(
+                            FACTORY.namedTable(null,
+                                    
TableName.create(index.getSchemaName().getString(),
+                                            index.getTableName().getString())),
+                            
dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);
+            alterIndex(indexStatement);
+            return state;
+        } finally {
+            connection.setAutoCommit(wasAutoCommit);
+        }
+    }
+
+    private MutationPlan getMutationPlanForBuildingIndex(PTable index, 
TableRef dataTableRef) throws SQLException {
+        MutationPlan mutationPlan;
+        if (index.getIndexType() == IndexType.LOCAL) {
+            PostLocalIndexDDLCompiler compiler =
+                    new PostLocalIndexDDLCompiler(connection, 
getFullTableName(dataTableRef));
+            mutationPlan = compiler.compile(index);
+        } else {
+            PostIndexDDLCompiler compiler = new 
PostIndexDDLCompiler(connection, dataTableRef);
+            mutationPlan = compiler.compile(index);
+        }
+        return mutationPlan;
+    }
 
     private MutationState buildIndex(PTable index, TableRef dataTableRef) 
throws SQLException {
         AlterIndexStatement indexStatement = null;
         boolean wasAutoCommit = connection.getAutoCommit();
         try {
             connection.setAutoCommit(true);
-            MutationPlan mutationPlan;
-            if (index.getIndexType() == IndexType.LOCAL) {
-                PostLocalIndexDDLCompiler compiler =
-                        new PostLocalIndexDDLCompiler(connection, 
getFullTableName(dataTableRef));
-                mutationPlan = compiler.compile(index);
-            } else {
-                PostIndexDDLCompiler compiler = new 
PostIndexDDLCompiler(connection, dataTableRef);
-                mutationPlan = compiler.compile(index);
-            }
+            MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, 
dataTableRef);
             Scan scan = mutationPlan.getContext().getScan();
             Long scn = connection.getSCN();
             try {

Reply via email to