PHOENIX-3161 Check possibility of moving rebuilding code to coprocessor of data table.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a95e8ab1 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a95e8ab1 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a95e8ab1 Branch: refs/heads/calcite Commit: a95e8ab1af2b8defef3d8c0ed5a060c9b9881dd9 Parents: 5c9fb7b Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Fri Oct 21 19:34:08 2016 +0530 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Fri Oct 21 19:34:08 2016 +0530 ---------------------------------------------------------------------- .../end2end/index/MutableIndexFailureIT.java | 11 +- .../apache/phoenix/compile/PostDDLCompiler.java | 2 +- .../coprocessor/BaseScannerRegionObserver.java | 1 + .../coprocessor/MetaDataRegionObserver.java | 101 +++++++------------ .../UngroupedAggregateRegionObserver.java | 100 +++++++++++++++++- .../hbase/index/util/IndexManagementUtil.java | 13 ++- .../java/org/apache/phoenix/util/ScanUtil.java | 4 + 7 files changed, 154 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index d6c1e9c..4263890 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -92,7 +92,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT { public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped) { this.transactional = transactional; this.localIndex = localIndex; - this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : ""; + this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : ""); this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "") + (isNamespaceMapped ? "_NM" : ""); this.indexName = INDEX_NAME; @@ -180,8 +180,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT { query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName; rs = conn.createStatement().executeQuery("EXPLAIN " + query); - String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER " - + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped); + String expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER " + + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT"; assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs)); rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); @@ -232,8 +232,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT { // Verify previous writes succeeded to data table query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName; rs = conn.createStatement().executeQuery("EXPLAIN " + query); - expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER " - + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped); + expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER " + + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT"; assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs)); rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); @@ -254,6 +254,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT { // re-enable index table FAIL_WRITE = false; waitForIndexToBeActive(conn,indexName); + waitForIndexToBeActive(conn,indexName+"_2"); waitForIndexToBeActive(conn,secondIndexName); // Verify UPSERT on data table still work after index table is recreated http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index 004e254..393499a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -210,7 +210,7 @@ public class PostDDLCompiler { if (ts!=HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) { ts = TransactionUtil.convertToNanoseconds(ts); } - ScanUtil.setTimeRange(scan, ts); + ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts); if (emptyCF != null) { scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 4fa1399..f6bd512 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -88,6 +88,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey"; public static final String REVERSE_SCAN = "_ReverseScan"; public static final String ANALYZE_TABLE = "_ANALYZETABLE"; + public static final String REBUILD_INDEXES = "_RebuildIndexes"; public static final String TX_STATE = "_TxState"; public static final String GUIDEPOST_WIDTH_BYTES = "_GUIDEPOST_WIDTH_BYTES"; public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index c645cf4..e790b59 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -17,8 +17,9 @@ */ package org.apache.phoenix.coprocessor; +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; + import java.io.IOException; -import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; @@ -33,16 +34,12 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -55,19 +52,19 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.phoenix.cache.GlobalCache; -import org.apache.phoenix.cache.ServerCacheClient; +import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.compile.PostDDLCompiler; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixDriver; -import org.apache.phoenix.parse.AlterIndexStatement; -import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.MetaDataClient; @@ -75,7 +72,7 @@ import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; -import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.MetaDataUtil; @@ -281,6 +278,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver { // Set SCN so that we don't ping server and have the upper bound set back to // the timestamp when the failure occurred. props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE)); + + //Set timeout to max value as rebuilding may take time + props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); + props.setProperty(QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB, Long.toString(Long.MAX_VALUE)); + props.setProperty(QueryServices.RPC_TIMEOUT_ATTRIB, Long.toString(Long.MAX_VALUE)); // don't run a second index populations upsert select props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration()).unwrap(PhoenixConnection.class); @@ -305,7 +307,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { // Allow index to begin incremental maintenance as index is back online and we // cannot transition directly from DISABLED -> ACTIVE if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexState) == 0) { - updateIndexState(indexTableFullName, env, PIndexState.DISABLE, PIndexState.INACTIVE); + updateIndexState(conn, indexTableFullName, env, PIndexState.DISABLE, PIndexState.INACTIVE); } List<PTable> indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable); if (indexesToPartiallyRebuild == null) { @@ -344,62 +346,31 @@ public class MetaDataRegionObserver extends BaseRegionObserver { long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime); LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild + " from timestamp=" + timeStamp); - Scan dataTableScan = IndexManagementUtil.newLocalStateScan(maintainers); + TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false); + // TODO Need to set high timeout + PostDDLCompiler compiler = new PostDDLCompiler(conn); + MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null, + HConstants.LATEST_TIMESTAMP); + Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(), + maintainers); dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP); - byte[] physicalTableName = dataPTable.getPhysicalName().getBytes(); - try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName)) { - Result result; - try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan)) { - int batchSize = conn.getMutateBatchSize(); - List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize); - ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable( - ByteUtil.EMPTY_BYTE_ARRAY); - IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, - indexesToPartiallyRebuild, conn); - byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); - byte[] uuidValue = ServerCacheClient.generateId(); + dataTableScan.setCacheBlocks(false); + dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); + + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable( + ByteUtil.EMPTY_BYTE_ARRAY); + IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, + conn); + byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); - while ((result = dataTableScanner.next()) != null && !result.isEmpty()) { - Put put = null; - Delete del = null; - for (Cell cell : result.rawCells()) { - if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { - if (put == null) { - put = new Put(CellUtil.cloneRow(cell)); - put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); - put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, - PDataType.TRUE_BYTES); - mutations.add(put); - } - put.add(cell); - } else { - if (del == null) { - del = new Delete(CellUtil.cloneRow(cell)); - del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); - del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, - PDataType.TRUE_BYTES); - mutations.add(del); - } - del.addDeleteMarker(cell); - } - } - if (mutations.size() == batchSize) { - dataHTable.batch(mutations); - uuidValue = ServerCacheClient.generateId(); - mutations.clear(); - } - } - if (!mutations.isEmpty()) { - dataHTable.batch(mutations); - } - } - } + dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + MutationState mutationState = plan.execute(); + long rowCount = mutationState.getUpdateCount(); + LOG.info(rowCount + " rows of index which are rebuild"); for (PTable indexPTable : indexesToPartiallyRebuild) { String indexTableFullName = SchemaUtil.getTableName(indexPTable.getSchemaName() .getString(), indexPTable.getTableName().getString()); - updateIndexState(indexTableFullName, env, PIndexState.INACTIVE, PIndexState.ACTIVE); + updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE, PIndexState.ACTIVE); } } catch (Exception e) { // Log, but try next table's indexes LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild @@ -429,11 +400,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } } - private static void updateIndexState(String indexTableName, RegionCoprocessorEnvironment env, PIndexState oldState, + private static void updateIndexState(PhoenixConnection conn, String indexTableName, RegionCoprocessorEnvironment env, PIndexState oldState, PIndexState newState) throws ServiceException, Throwable { byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName); - String indexName = SchemaUtil.getSchemaNameFromFullName(indexTableName); + String indexName = SchemaUtil.getTableNameFromFullName(indexTableName); // Mimic the Put that gets generated by the client on an update of the index state Put put = new Put(indexTableKey); put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, @@ -443,9 +414,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { PLong.INSTANCE.toBytes(0)); } final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put); - Connection conn = QueryUtil.getConnection(env.getConfiguration()); - MetaDataMutationResult result = conn.unwrap(PhoenixConnection.class).getQueryServices() - .updateIndexState(tableMetadata, null); + MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null); MutationCode code = result.getMutationCode(); if (code == MutationCode.TABLE_NOT_FOUND) { throw new TableNotFoundException(schemaName, indexName); } if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { throw new SQLExceptionInfo.Builder( http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 0f175c5..38f7253 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -43,6 +43,7 @@ import javax.annotation.concurrent.GuardedBy; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; @@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.DataExceedsCapacityException; import org.apache.phoenix.execute.TupleProjector; @@ -263,7 +265,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver env, region.getRegionInfo().getTable().getNameAsString(), ts, gp_width_bytes, gp_per_region_bytes); return collectStats(s, statsCollector, region, scan, env.getConfiguration()); - } + } else if (ScanUtil.isIndexRebuild(scan)) { return rebuildIndices(s, region, scan, env.getConfiguration()); } int offsetToBe = 0; if (localIndexScan) { /* @@ -725,6 +727,102 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } + private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan, + Configuration config) throws IOException { + byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); + boolean hasMore; + long rowCount = 0; + try { + int batchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); + List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize); + region.startRegionOperation(); + byte[] uuidValue = ServerCacheClient.generateId(); + synchronized (innerScanner) { + do { + List<Cell> results = new ArrayList<Cell>(); + hasMore = innerScanner.nextRaw(results); + if (!results.isEmpty()) { + Put put = null; + Delete del = null; + for (Cell cell : results) { + + if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { + if (put == null) { + put = new Put(CellUtil.cloneRow(cell)); + put.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMetaData); + put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, + PDataType.TRUE_BYTES); + mutations.add(put); + } + put.add(cell); + } else { + if (del == null) { + del = new Delete(CellUtil.cloneRow(cell)); + del.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMetaData); + del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, + PDataType.TRUE_BYTES); + mutations.add(del); + } + del.addDeleteMarker(cell); + } + } + if (mutations.size() >= batchSize) { + region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, + HConstants.NO_NONCE); + uuidValue = ServerCacheClient.generateId(); + mutations.clear(); + } + rowCount++; + } + + } while (hasMore); + if (!mutations.isEmpty()) { + region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, + HConstants.NO_NONCE); + } + } + } catch (IOException e) { + logger.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e)); + throw e; + } finally { + region.closeRegionOperation(); + } + byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); + final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, + SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); + + RegionScanner scanner = new BaseRegionScanner(innerScanner) { + @Override + public HRegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + @Override + public boolean isFilterDone() { + return true; + } + + @Override + public void close() throws IOException { + // no-op because we want to manage closing of the inner scanner ourselves. + } + + @Override + public boolean next(List<Cell> results) throws IOException { + results.add(aggKeyValue); + return false; + } + + @Override + public long getMaxResultSize() { + return scan.getMaxResultSize(); + } + }; + return scanner; + } + private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats, final Region region, final Scan scan, Configuration config) throws IOException { StatsCollectionCallable callable = http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java index 4519145..c6642e7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java @@ -20,18 +20,14 @@ package org.apache.phoenix.hbase.index.util; import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; - -import com.google.common.collect.Maps; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException; import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter; @@ -150,7 +146,14 @@ public class IndexManagementUtil { } public static Scan newLocalStateScan(List<? extends Iterable<? extends ColumnReference>> refsArray) { - Scan s = new Scan(); + return newLocalStateScan(null, refsArray); + } + + public static Scan newLocalStateScan(Scan scan, List<? extends Iterable<? extends ColumnReference>> refsArray) { + Scan s = scan; + if (scan == null) { + s = new Scan(); + } s.setRaw(true); // add the necessary columns to the scan for (Iterable<? extends ColumnReference> refs : refsArray) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index b0e8a99..acaeb31 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -895,4 +895,8 @@ public class ScanUtil { return true; } + public static boolean isIndexRebuild(Scan scan) { + return scan.getAttribute((BaseScannerRegionObserver.REBUILD_INDEXES)) != null; + } + } \ No newline at end of file