Repository: phoenix Updated Branches: refs/heads/master da345f84c -> 6ff638e18
PHOENIX-2724 Query with large number of guideposts is slower compared to no stats Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6ff638e1 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6ff638e1 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6ff638e1 Branch: refs/heads/master Commit: 6ff638e18de4b591aefd428cf888c5088d8ec175 Parents: da345f8 Author: Samarth <samarth.j...@salesforce.com> Authored: Fri Apr 22 23:33:17 2016 -0700 Committer: Samarth <samarth.j...@salesforce.com> Committed: Fri Apr 22 23:33:17 2016 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/QueryWithOffsetIT.java | 33 ++--- .../phoenix/end2end/SerialIteratorsIT.java | 90 ++++++++++++ .../phoenix/coprocessor/ScanRegionObserver.java | 9 +- .../org/apache/phoenix/execute/ScanPlan.java | 17 ++- .../phoenix/iterate/BaseResultIterators.java | 4 +- .../phoenix/iterate/ChunkedResultIterator.java | 13 +- .../phoenix/iterate/OffsetResultIterator.java | 2 +- .../phoenix/iterate/ParallelIterators.java | 4 +- .../apache/phoenix/iterate/SerialIterators.java | 142 ++++++++++++++----- .../phoenix/iterate/SpoolingResultIterator.java | 17 ++- .../phoenix/iterate/TableResultIterator.java | 22 +-- .../apache/phoenix/jdbc/PhoenixConnection.java | 9 +- .../apache/phoenix/query/QueryConstants.java | 2 +- .../org/apache/phoenix/schema/PTableImpl.java | 1 + .../java/org/apache/phoenix/util/QueryUtil.java | 10 +- 15 files changed, 282 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java index 83d759b..dd21f31 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java @@ -89,9 +89,10 @@ public class QueryWithOffsetIT extends BaseOwnClusterHBaseManagedTimeIT { rs = conn.createStatement() .executeQuery("SELECT t_id from " + tableName + " order by t_id limit " + limit + " offset " + offset); int i = 0; - while (i++ < limit) { + while (i < limit) { assertTrue(rs.next()); - assertEquals(strings[offset + i - 1], rs.getString(1)); + assertEquals("Expected string didn't match for i = " + i, strings[offset + i], rs.getString(1)); + i++; } limit = 35; @@ -176,20 +177,6 @@ public class QueryWithOffsetIT extends BaseOwnClusterHBaseManagedTimeIT { conn.close(); } - private void initTableValues(Connection conn) throws SQLException { - for (int i = 0; i < 26; i++) { - conn.createStatement().execute("UPSERT INTO " + tableName + " values('" + strings[i] + "'," + i + "," - + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')"); - } - conn.commit(); - } - - private void updateStatistics(Connection conn) throws SQLException { - String query = "UPDATE STATISTICS " + tableName + " SET \"" + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB - + "\"=" + Long.toString(500); - conn.createStatement().execute(query); - } - @Test public void testMetaDataWithOffset() throws SQLException { Connection conn; @@ -207,5 +194,19 @@ public class QueryWithOffsetIT extends BaseOwnClusterHBaseManagedTimeIT { ResultSetMetaData md = rs.getMetaData(); assertEquals(5, md.getColumnCount()); } + + private void initTableValues(Connection conn) throws SQLException { + for (int i = 0; i < 26; i++) { + conn.createStatement().execute("UPSERT INTO " + tableName + " values('" + strings[i] + "'," + i + "," + + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')"); + } + conn.commit(); + } + + private void updateStatistics(Connection conn) throws SQLException { + String query = "UPDATE STATISTICS " + tableName + " SET \"" + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + + "\"=" + Long.toString(500); + conn.createStatement().execute(query); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java new file mode 100644 index 0000000..d4c71af --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class SerialIteratorsIT extends BaseHBaseManagedTimeTableReuseIT { + private String tableName = generateRandomString(); + private final String[] strings = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", + "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" }; + private final String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n" + + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) SPLIT ON ('e','i','o')"; + + @BeforeClass + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Don't force row key order + props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testConcatenatingSerialIterators() throws Exception { + Connection conn; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + createTestTable(getUrl(), ddl); + initTableValues(conn); + String query = "SELECT t_id from " + tableName + " order by t_id desc limit " + 10; + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + ResultSet rs = stmt.executeQuery(query); + int i = 25; + while (i >= 16) { + assertTrue(rs.next()); + assertEquals(strings[i--], rs.getString(1)); + } + query = "SELECT t_id from " + tableName + " order by t_id limit " + 10; + stmt = conn.createStatement().unwrap(PhoenixStatement.class); + rs = stmt.executeQuery(query); + i = 0; + while (i < 10) { + assertTrue(rs.next()); + assertEquals(strings[i++], rs.getString(1)); + } + conn.close(); + } + + private void initTableValues(Connection conn) throws SQLException { + for (int i = 0; i < 26; i++) { + conn.createStatement().execute("UPSERT INTO " + tableName + " values('" + strings[i] + "'," + i + "," + + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')"); + } + conn.commit(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 905dfa0..5feb3e9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -60,6 +60,7 @@ import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; @@ -193,7 +194,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { byte[] scanOffsetBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET); Integer scanOffset = null; if (scanOffsetBytes != null) { - scanOffset = Bytes.toInt(scanOffsetBytes); + scanOffset = (Integer) PInteger.INSTANCE.toObject(scanOffsetBytes); } RegionScanner innerScanner = s; @@ -247,15 +248,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { final Region region = c.getEnvironment().getRegion(); region.startRegionOperation(); try { - // Once we return from the first call to next, we've run through and - // cached - // the topN rows, so we no longer need to start/stop a region - // operation. Tuple tuple = iterator.next(); if (tuple == null && !isLastScan) { List<KeyValue> kvList = new ArrayList<KeyValue>(1); KeyValue kv = new KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY, - QueryConstants.OFFSET_COLUMN, Bytes.toBytes(iterator.getUnusedOffset())); + QueryConstants.OFFSET_COLUMN, PInteger.INSTANCE.toBytes(iterator.getRemainingOffset())); kvList.add(kv); Result r = new Result(kvList); firstTuple = new ResultTuple(r); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 93ae5d6..c5dabfe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -18,6 +18,9 @@ package org.apache.phoenix.execute; +import static org.apache.phoenix.util.ScanUtil.isPacingScannersPossible; +import static org.apache.phoenix.util.ScanUtil.isRoundRobinPossible; + import java.sql.SQLException; import java.util.Collections; import java.util.List; @@ -113,6 +116,15 @@ public class ScanPlan extends BaseQueryPlan { return false; } PTable table = tableRef.getTable(); + /* + * For salted or local index tables, if rows are requested in a row key order, then we + * cannot execute a query serially. We need to be able to do a merge sort across all scans + * which isn't possible with SerialIterators. For other kinds of tables though we are ok + * since SerialIterators execute scans in the correct order. + */ + if ((table.getBucketNum() != null || table.getIndexType() == IndexType.LOCAL) && ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, context)) { + return false; + } GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table)); long estRowSize = SchemaUtil.estimateRowSize(table); long estRegionSize; @@ -147,8 +159,9 @@ public class ScanPlan extends BaseQueryPlan { TableRef table, OrderBy orderBy, Integer limit,Integer offset, boolean allowPageFilter) throws SQLException { if ((isSerial(context, statement, table, orderBy, limit, offset, allowPageFilter) - || ScanUtil.isRoundRobinPossible(orderBy, context) || ScanUtil.isPacingScannersPossible(context)) - && offset == null) { return ParallelIteratorFactory.NOOP_FACTORY; } + || isRoundRobinPossible(orderBy, context) || isPacingScannersPossible(context))) { + return ParallelIteratorFactory.NOOP_FACTORY; + } ParallelIteratorFactory spoolingResultIteratorFactory = new SpoolingResultIterator.SpoolingResultIteratorFactory( context.getConnection().getQueryServices()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 0299f18..043bd30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -676,7 +676,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result SQLException toThrow = null; int queryTimeOut = context.getStatement().getQueryTimeoutInMillis(); try { - submitWork(scan, futures, allIterators, splitSize); + submitWork(scan, futures, allIterators, splitSize, isReverse); boolean clearedCache = false; for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) { List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size()); @@ -868,7 +868,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result abstract protected String getName(); abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - Queue<PeekingResultIterator> allIterators, int estFlattenedSize) throws SQLException; + Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse) throws SQLException; @Override public int size() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index a78565d..7a830de 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -44,7 +44,13 @@ import com.google.common.base.Preconditions; /** * {@code PeekingResultIterator} implementation that loads data in chunks. This is intended for * basic scan plans, to avoid loading large quantities of data from HBase in one go. + * + * <p> + * Chunking is deprecated and shouldn't be used while implementing new features. As of HBase 0.98.17, + * we rely on pacing the server side scanners instead of pulling rows from the server in chunks. + * </p> */ +@Deprecated public class ChunkedResultIterator implements PeekingResultIterator { private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class); @@ -56,7 +62,12 @@ public class ChunkedResultIterator implements PeekingResultIterator { private final MutationState mutationState; private Scan scan; private PeekingResultIterator resultIterator; - + + /** + * Chunking is deprecated and shouldn't be used while implementing new features. As of HBase 0.98.17, + * we rely on pacing the server side scanners instead of pulling rows from the server in chunks. + */ + @Deprecated public static class ChunkedResultIteratorFactory implements ParallelIteratorFactory { private final ParallelIteratorFactory delegateFactory; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java index ef8eacf..db53806 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java @@ -56,7 +56,7 @@ public class OffsetResultIterator extends DelegateResultIterator { return "OffsetResultIterator [rowCount=" + rowCount + ", offset=" + offset + "]"; } - public Integer getUnusedOffset() { + public Integer getRemainingOffset() { return (offset - rowCount) > 0 ? (offset - rowCount) : 0; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index ca0eba0..a5664c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -67,7 +67,7 @@ public class ParallelIterators extends BaseResultIterators { @Override protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) throws SQLException { + final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse) throws SQLException { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor @@ -99,7 +99,7 @@ public class ParallelIterators extends BaseResultIterators { final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName); final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName); final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold); - context.getConnection().addIterator(tableResultItr); + context.getConnection().addIteratorForLeaseRenewal(tableResultItr); Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index 17c2279..d2c89b9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -30,13 +30,16 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.trace.util.Tracing; -import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.QueryUtil; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -53,11 +56,13 @@ import com.google.common.collect.Lists; public class SerialIterators extends BaseResultIterators { private static final String NAME = "SERIAL"; private final ParallelIteratorFactory iteratorFactory; + private final Integer offset; public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper) throws SQLException { super(plan, perScanLimit, offset, scanGrouper); + this.offset = offset; // must be a offset or a limit specified or a SERIAL hint Preconditions.checkArgument( offset != null || perScanLimit != null || plan.getStatement().getHint().hasHint(HintNode.Hint.SERIAL)); @@ -65,40 +70,28 @@ public class SerialIterators extends BaseResultIterators { } @Override - protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) { - // Pre-populate nestedFutures lists so that we can shuffle the scans - // and add the future to the right nested list. By shuffling the scans - // we get better utilization of the cluster since our thread executor - // will spray the scans across machines as opposed to targeting a - // single one since the scans are in row key order. + protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, + final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse) { ExecutorService executor = context.getConnection().getQueryServices().getExecutor(); - - for (final List<Scan> scans : nestedScans) { - Scan firstScan = scans.get(0); - Scan lastScan = scans.get(scans.size()-1); - final Scan overallScan = ScanUtil.newScan(firstScan); - overallScan.setStopRow(lastScan.getStopRow()); - final String tableName = tableRef.getTable().getPhysicalName().getString(); - final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName); - final PhoenixConnection conn = context.getConnection(); - final long renewLeaseThreshold = conn.getQueryServices().getRenewLeaseThresholdMilliSeconds(); - lastScan.setAttribute(QueryConstants.LAST_SCAN, Bytes.toBytes(Boolean.TRUE)); + final String tableName = tableRef.getTable().getPhysicalName().getString(); + final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName); + final PhoenixConnection conn = context.getConnection(); + final long renewLeaseThreshold = conn.getQueryServices().getRenewLeaseThresholdMilliSeconds(); + int expectedListSize = nestedScans.size() * 10; + List<Scan> flattenedScans = Lists.newArrayListWithExpectedSize(expectedListSize); + for (List<Scan> list : nestedScans) { + flattenedScans.addAll(list); + } + if (!flattenedScans.isEmpty()) { + if (isReverse) { + flattenedScans = Lists.reverse(flattenedScans); + } + final List<Scan> finalScans = flattenedScans; Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { @Override public PeekingResultIterator call() throws Exception { - PeekingResultIterator previousIterator = null; - List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size()); - for (final Scan scan : scans) { - TableResultIterator scanner = new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold, previousIterator); - conn.addIterator(scanner); - PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan, tableName); - concatIterators.add(iterator); - previousIterator = iterator; - } - PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators); - allIterators.add(concatIterator); - return concatIterator; + PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset); + return itr; } /** @@ -117,7 +110,7 @@ public class SerialIterators extends BaseResultIterators { } }, "Serial scanner for table: " + tableRef.getTable().getPhysicalName().getString())); // Add our singleton Future which will execute serially - nestedFutures.add(Collections.singletonList(new Pair<Scan,Future<PeekingResultIterator>>(overallScan,future))); + nestedFutures.add(Collections.singletonList(new Pair<Scan, Future<PeekingResultIterator>>(flattenedScans.get(0), future))); } } @@ -125,4 +118,89 @@ public class SerialIterators extends BaseResultIterators { protected String getName() { return NAME; } + + /** + * + * Iterator that creates iterators for scans only when needed. + * This helps reduce the cost of pre-constructing all the iterators + * which we may not even use. + */ + private class SerialIterator implements PeekingResultIterator { + private final List<Scan> scans; + private final String tableName; + private final long renewLeaseThreshold; + private int index; + private PeekingResultIterator currentIterator; + private Integer remainingOffset; + + private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset) throws SQLException { + this.scans = Lists.newArrayListWithExpectedSize(flattenedScans.size()); + this.tableName = tableName; + this.renewLeaseThreshold = renewLeaseThreshold; + this.scans.addAll(flattenedScans); + this.remainingOffset = offset; + if (this.remainingOffset != null) { + // mark the last scan for offset purposes + this.scans.get(this.scans.size() - 1).setAttribute(QueryConstants.LAST_SCAN, Bytes.toBytes(Boolean.TRUE)); + } + } + + private PeekingResultIterator currentIterator() throws SQLException { + if (currentIterator == null) { + return currentIterator = nextIterator(); + } + if (currentIterator.peek() == null) { + currentIterator.close(); + currentIterator = nextIterator(); + } + return currentIterator; + } + + private PeekingResultIterator nextIterator() throws SQLException { + if (index >= scans.size()) { + return EMPTY_ITERATOR; + } + while (index < scans.size()) { + Scan currentScan = scans.get(index++); + if (remainingOffset != null) { + currentScan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, PInteger.INSTANCE.toBytes(remainingOffset)); + } + TableResultIterator itr = new TableResultIterator(mutationState, tableRef, currentScan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold); + PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName); + Tuple tuple; + if ((tuple = peekingItr.peek()) == null) { + peekingItr.close(); + continue; + } else if ((remainingOffset = QueryUtil.getRemainingOffset(tuple)) != null) { + peekingItr.next(); + peekingItr.close(); + continue; + } + context.getConnection().addIteratorForLeaseRenewal(itr); + return peekingItr; + } + return EMPTY_ITERATOR; + } + + @Override + public Tuple next() throws SQLException { + return currentIterator().next(); + } + + @Override + public void explain(List<String> planSteps) {} + + @Override + public void close() throws SQLException { + if (currentIterator != null) { + currentIterator.close(); + } + } + + @Override + public Tuple peek() throws SQLException { + return currentIterator().peek(); + } + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java index 0a3c32b..4f85a5f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java @@ -55,15 +55,28 @@ import org.apache.phoenix.util.TupleUtil; * * Result iterator that spools the results of a scan to disk once an in-memory threshold has been reached. * If the in-memory threshold is not reached, the results are held in memory with no disk writing perfomed. - * - * + * + * <p> + * Spooling is deprecated and shouldn't be used while implementing new features. As of HBase 0.98.17, + * we rely on pacing the server side scanners instead of pulling rows from the server and potentially + * spooling to a temporary file created on clients. + * </p> + * * @since 0.1 */ +@Deprecated public class SpoolingResultIterator implements PeekingResultIterator { private final PeekingResultIterator spoolFrom; private final SpoolingMetricsHolder spoolMetrics; private final MemoryMetricsHolder memoryMetrics; + + /** + * Spooling is deprecated and shouldn't be used while implementing new features. As of HBase + * 0.98.17, we rely on pacing the server side scanners instead of pulling rows from the server + * and potentially spooling to a temporary file created on clients. + */ + @Deprecated public static class SpoolingResultIteratorFactory implements ParallelIteratorFactory { private final QueryServices services; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index a86f899..a7e3068 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -32,14 +32,12 @@ import javax.annotation.concurrent.GuardedBy; import org.apache.hadoop.hbase.client.AbstractClientScanner; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Scan; -import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.Closeables; -import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ServerUtil; import com.google.common.annotations.VisibleForTesting; @@ -68,8 +66,7 @@ public class TableResultIterator implements ResultIterator { @GuardedBy("this") private long renewLeaseTime = 0; - private PeekingResultIterator previousIterator; - + @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE! TableResultIterator() { this.scanMetrics = null; @@ -83,19 +80,13 @@ public class TableResultIterator implements ResultIterator { }; public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, - long renewLeaseThreshold) throws SQLException { - this(mutationState,tableRef,scan,scanMetrics,renewLeaseThreshold,null); - } - - public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, - long renewLeaseThreshold, PeekingResultIterator previousIterator) throws SQLException { + long renewLeaseThreshold) throws SQLException { this.scan = scan; this.scanMetrics = scanMetrics; PTable table = tableRef.getTable(); htable = mutationState.getHTable(table); this.scanIterator = UNINITIALIZED_SCANNER; this.renewLeaseThreshold = renewLeaseThreshold; - this.previousIterator = previousIterator; } @Override @@ -127,15 +118,6 @@ public class TableResultIterator implements ResultIterator { ResultIterator delegate = this.scanIterator; if (delegate == UNINITIALIZED_SCANNER) { try { - if (previousIterator != null && scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET) != null) { - byte[] unusedOffset = QueryUtil.getUnusedOffset(previousIterator.peek()); - if (unusedOffset != null) { - scan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, unusedOffset); - previousIterator.next(); - } else { - scan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, null); - } - } this.scanIterator = new ScanningResultIterator(htable.getScanner(scan), scanMetrics); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index b205da4..0e69bb5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -58,6 +58,8 @@ import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import co.cask.tephra.TransactionContext; + import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Consistency; import org.apache.htrace.Sampler; @@ -77,6 +79,7 @@ import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.ConnectionQueryServices.Feature; import org.apache.phoenix.query.DelegateConnectionQueryServices; import org.apache.phoenix.query.MetaDataMutated; import org.apache.phoenix.query.QueryConstants; @@ -119,8 +122,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Lists; -import co.cask.tephra.TransactionContext; - /** * @@ -1043,8 +1044,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this.parallelIteratorFactory = parallelIteratorFactory; } - public void addIterator(@Nonnull TableResultIterator itr) { - if (services.supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE)) { + public void addIteratorForLeaseRenewal(@Nonnull TableResultIterator itr) { + if (services.supportsFeature(Feature.RENEW_LEASE)) { checkNotNull(itr); scannerQueue.add(new WeakReference<TableResultIterator>(itr)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 450ccfe..36aa8cd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -144,7 +144,7 @@ public interface QueryConstants { public final static String PHOENIX_METADATA = "table"; public final static String OFFSET_ROW_KEY = "_OFFSET_"; public final static byte[] OFFSET_ROW_KEY_BYTES = Bytes.toBytes(OFFSET_ROW_KEY); - public final static ImmutableBytesPtr offsetRowKeyPtr = new ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES); + public final static ImmutableBytesPtr OFFSET_ROW_KEY_PTR = new ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES); public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s"); public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 665e77f..f4300fd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -1226,6 +1226,7 @@ public class PTableImpl implements PTable { return isTransactional; } + @Override public int getBaseColumnCount() { return baseColumnCount; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java index ded20e9..daab198 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java @@ -48,7 +48,9 @@ import org.apache.phoenix.parse.WildcardParseNode; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PInteger; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -369,13 +371,13 @@ public final class QueryUtil { } - public static byte[] getUnusedOffset(Tuple offsetTuple) { + public static Integer getRemainingOffset(Tuple offsetTuple) { if (offsetTuple != null) { ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(); offsetTuple.getKey(rowKeyPtr); - if (QueryConstants.offsetRowKeyPtr.compareTo(rowKeyPtr) == 0) { - Cell value = offsetTuple.getValue(QueryConstants.OFFSET_FAMILY, QueryConstants.OFFSET_COLUMN); - return value.getValue(); + if (QueryConstants.OFFSET_ROW_KEY_PTR.compareTo(rowKeyPtr) == 0) { + Cell cell = offsetTuple.getValue(QueryConstants.OFFSET_FAMILY, QueryConstants.OFFSET_COLUMN); + return PInteger.INSTANCE.toObject(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), PInteger.INSTANCE, SortOrder.ASC, null, null); } } return null;