Repository: phoenix Updated Branches: refs/heads/master 44c00345b -> 7865a59b0
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java new file mode 100644 index 0000000..6d67348 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.sql.SQLException; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; + +public class HashJoinCacheNotFoundException extends SQLException{ + private static final long serialVersionUID = 1L; + private Long cacheId; + private static SQLExceptionCode ERROR_CODE = SQLExceptionCode.HASH_JOIN_CACHE_NOT_FOUND; + public HashJoinCacheNotFoundException() { + this(null); + } + + public HashJoinCacheNotFoundException(Long cacheId) { + super(new SQLExceptionInfo.Builder(ERROR_CODE).setMessage("joinId: " + cacheId + + ". The cache might have expired and have been removed.").build().toString(), + ERROR_CODE.getSQLState(), ERROR_CODE.getErrorCode(), null); + this.cacheId=cacheId; + } + + public Long getCacheId(){ + return this.cacheId; + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index ced29bd..5061d94 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -46,6 +46,7 @@ import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TupleUtil; @@ -97,10 +98,12 @@ public class HashJoinRegionScanner implements RegionScanner { continue; } HashCache hashCache = (HashCache)cache.getServerCache(joinId); - if (hashCache == null) - throw new DoNotRetryIOException("Could not find hash cache for joinId: " - + Bytes.toString(joinId.get(), joinId.getOffset(), joinId.getLength()) - + ". The cache might have expired and have been removed."); + if (hashCache == null) { + Exception cause = new HashJoinCacheNotFoundException( + Bytes.toLong(ByteUtil.copyKeyBytesIfNecessary(joinId))); + throw new DoNotRetryIOException(cause.getMessage(), cause); + } + hashCaches[i] = hashCache; tempSrcBitSet[i] = ValueBitSet.newInstance(joinInfo.getSchemas()[i]); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index ff4a35c..c9dce27 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -439,7 +439,8 @@ public enum SQLExceptionCode { " of connections to the target cluster."), MAX_MUTATION_SIZE_EXCEEDED(729, "LIM01", "MutationState size is bigger than maximum allowed number of rows"), - MAX_MUTATION_SIZE_BYTES_EXCEEDED(730, "LIM02", "MutationState size is bigger than maximum allowed number of bytes"); + MAX_MUTATION_SIZE_BYTES_EXCEEDED(730, "LIM02", "MutationState size is bigger than maximum allowed number of bytes"), + HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found"); private final int errorCode; private final String sqlState; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 2cdaac7..74c8d39 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -21,9 +21,10 @@ package org.apache.phoenix.execute; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.QueryPlan; @@ -36,6 +37,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.expression.RowKeyExpression; import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.AggregatingResultIterator; import org.apache.phoenix.iterate.BaseResultIterators; import org.apache.phoenix.iterate.ConcatResultIterator; @@ -185,7 +187,7 @@ public class AggregatePlan extends BaseQueryPlan { } @Override - protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { if (groupBy.isEmpty()) { UngroupedAggregateRegionObserver.serializeIntoScan(scan); } else { @@ -221,8 +223,8 @@ public class AggregatePlan extends BaseQueryPlan { } } BaseResultIterators iterators = isSerial - ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan) - : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false); + ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan, caches) + : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false, caches); estimatedRows = iterators.getEstimatedRowCount(); estimatedSize = iterators.getEstimatedByteCount(); splits = iterators.getSplits(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index e6e7b97..238a537 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -24,6 +24,7 @@ import java.sql.ParameterMetaData; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.htrace.TraceScope; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; @@ -47,6 +49,7 @@ import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ProjectedColumnExpression; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.iterate.DefaultParallelScanGrouper; import org.apache.phoenix.iterate.DelegateResultIterator; @@ -73,7 +76,6 @@ import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; -import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; @@ -196,20 +198,20 @@ public abstract class BaseQueryPlan implements QueryPlan { @Override public final ResultIterator iterator() throws SQLException { - return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance(), null); + return iterator(DefaultParallelScanGrouper.getInstance()); } @Override public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { - return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, null); + return iterator(scanGrouper, null); } @Override public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { - return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, scan); + return iterator(Collections.<ImmutableBytesPtr,ServerCache>emptyMap(), scanGrouper, scan); } - - private ResultIterator getWrappedIterator(final List<? extends SQLCloseable> dependencies, + + private ResultIterator getWrappedIterator(final Map<ImmutableBytesPtr,ServerCache> dependencies, ResultIterator iterator) { ResultIterator wrappedIterator = dependencies.isEmpty() ? iterator : new DelegateResultIterator(iterator) { @Override @@ -217,14 +219,15 @@ public abstract class BaseQueryPlan implements QueryPlan { try { super.close(); } finally { - SQLCloseables.closeAll(dependencies); + SQLCloseables.closeAll(dependencies.values()); } } }; return wrappedIterator; } - public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + public final ResultIterator iterator(final Map<ImmutableBytesPtr,ServerCache> caches, + ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { if (scan == null) { scan = context.getScan(); } @@ -235,11 +238,11 @@ public abstract class BaseQueryPlan implements QueryPlan { * row to be scanned. */ if (context.getScanRanges() == ScanRanges.NOTHING && !getStatement().isAggregate()) { - return getWrappedIterator(dependencies, ResultIterator.EMPTY_ITERATOR); + return getWrappedIterator(caches, ResultIterator.EMPTY_ITERATOR); } if (tableRef == TableRef.EMPTY_TABLE_REF) { - return getWrappedIterator(dependencies, newIterator(scanGrouper, scan)); + return newIterator(scanGrouper, scan, caches); } // Set miscellaneous scan attributes. This is the last chance to set them before we @@ -344,7 +347,7 @@ public abstract class BaseQueryPlan implements QueryPlan { LOG.debug(LogUtil.addCustomAnnotations("Scan ready for iteration: " + scan, connection)); } - ResultIterator iterator = getWrappedIterator(dependencies, newIterator(scanGrouper, scan)); + ResultIterator iterator = newIterator(scanGrouper, scan, caches); if (LOG.isDebugEnabled()) { LOG.debug(LogUtil.addCustomAnnotations("Iterator ready: " + iterator, connection)); } @@ -468,7 +471,7 @@ public abstract class BaseQueryPlan implements QueryPlan { } } - abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException; + abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException; @Override public long getEstimatedSize() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 17c3cca..879aa61 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -24,6 +24,7 @@ import static org.apache.phoenix.util.NumberUtil.add; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; @@ -51,6 +52,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.RowValueConstructorExpression; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.FilterResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; @@ -73,10 +75,10 @@ import org.apache.phoenix.schema.types.PArrayDataType; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PVarbinary; -import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class HashJoinPlan extends DelegateQueryPlan { @@ -88,7 +90,7 @@ public class HashJoinPlan extends DelegateQueryPlan { private final boolean recompileWhereClause; private final Set<TableRef> tableRefs; private final int maxServerCacheTimeToLive; - private final List<SQLCloseable> dependencies = Lists.newArrayList(); + private final Map<ImmutableBytesPtr,ServerCache> dependencies = Maps.newHashMap(); private HashCacheClient hashClient; private AtomicLong firstJobEndTime; private List<Expression> keyRangeExpressions; @@ -99,7 +101,7 @@ public class HashJoinPlan extends DelegateQueryPlan { public static HashJoinPlan create(SelectStatement statement, QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) throws SQLException { if (!(plan instanceof HashJoinPlan)) - return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null, Collections.<SQLCloseable>emptyList()); + return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null, Collections.<ImmutableBytesPtr,ServerCache>emptyMap()); HashJoinPlan hashJoinPlan = (HashJoinPlan) plan; assert (hashJoinPlan.joinInfo == null && hashJoinPlan.delegate instanceof BaseQueryPlan); @@ -115,9 +117,9 @@ public class HashJoinPlan extends DelegateQueryPlan { } private HashJoinPlan(SelectStatement statement, - QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause, List<SQLCloseable> dependencies) throws SQLException { + QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause, Map<ImmutableBytesPtr,ServerCache> dependencies) throws SQLException { super(plan); - this.dependencies.addAll(dependencies); + this.dependencies.putAll(dependencies); this.statement = statement; this.joinInfo = joinInfo; this.subPlans = subPlans; @@ -182,7 +184,7 @@ public class HashJoinPlan extends DelegateQueryPlan { try { ServerCache result = futures.get(i).get(); if (result != null) { - dependencies.add(result); + dependencies.put(new ImmutableBytesPtr(result.getId()),result); } subPlans[i].postProcess(result, this); } catch (InterruptedException e) { @@ -198,7 +200,7 @@ public class HashJoinPlan extends DelegateQueryPlan { } } if (firstException != null) { - SQLCloseables.closeAllQuietly(dependencies); + SQLCloseables.closeAllQuietly(dependencies.values()); throw firstException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java index 1853c45..781c07e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java @@ -21,13 +21,16 @@ import java.sql.SQLException; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; @@ -37,6 +40,7 @@ import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.SQLCloseables; public class LiteralResultIterationPlan extends BaseQueryPlan { protected final Iterable<Tuple> tuples; @@ -71,7 +75,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { } @Override - protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) + protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, final Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { ResultIterator scanner = new ResultIterator() { private final Iterator<Tuple> tupleIterator = tuples.iterator(); @@ -81,7 +85,8 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { @Override public void close() throws SQLException { - this.closed = true;; + SQLCloseables.closeAll(caches.values()); + this.closed = true; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 2990b77..f5b1af0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -24,10 +24,12 @@ import static org.apache.phoenix.util.ScanUtil.isRoundRobinPossible; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.RowProjector; @@ -35,6 +37,7 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.ScanRegionObserver; import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.BaseResultIterators; import org.apache.phoenix.iterate.ChunkedResultIterator; import org.apache.phoenix.iterate.ConcatResultIterator; @@ -206,7 +209,7 @@ public class ScanPlan extends BaseQueryPlan { } @Override - protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { // Set any scan attributes before creating the scanner, as it will be too late afterwards scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE); ResultIterator scanner; @@ -229,11 +232,11 @@ public class ScanPlan extends BaseQueryPlan { && isDataToScanWithinThreshold; BaseResultIterators iterators; if (isOffsetOnServer) { - iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan); + iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan, caches); } else if (isSerial) { - iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan); + iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan, caches); } else { - iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly); + iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches); } estimatedRows = iterators.getEstimatedRowCount(); estimatedSize = iterators.getEstimatedByteCount(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 98f5d46..a3a9762 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -66,11 +66,14 @@ import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.ServerCacheClient; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -80,11 +83,13 @@ import org.apache.phoenix.filter.DistinctPrefixFilter; import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.VersionUtil; +import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; @@ -133,7 +138,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private static final Logger logger = LoggerFactory.getLogger(BaseResultIterators.class); private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20; private static final int MIN_SEEK_TO_COLUMN_VERSION = VersionUtil.encodeVersion("0", "98", "12"); - private final List<List<Scan>> scans; private final List<KeyRange> splits; private final byte[] physicalTableName; @@ -148,6 +152,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private boolean hasGuidePosts; private Scan scan; private boolean useStatsForParallelization; + protected Map<ImmutableBytesPtr,ServerCache> caches; static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() { @Override @@ -464,11 +469,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } - public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), QueryUtil.getOffsetLimit(plan.getLimit(), plan.getOffset()), offset); this.plan = plan; this.scan = scan; + this.caches = caches; this.scanGrouper = scanGrouper; StatementContext context = plan.getContext(); // Clone MutationState as the one on the connection will change if auto commit is on @@ -844,7 +850,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans); ScanWrapper previousScan = new ScanWrapper(null); return getIterators(scans, services, isLocalIndex, allIterators, iterators, isReverse, maxQueryEndTime, - splits.size(), previousScan); + splits.size(), previousScan, context.getConnection().getQueryServices().getConfiguration() + .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES)); } class ScanWrapper { @@ -866,11 +873,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private List<PeekingResultIterator> getIterators(List<List<Scan>> scan, ConnectionQueryServices services, boolean isLocalIndex, Queue<PeekingResultIterator> allIterators, List<PeekingResultIterator> iterators, - boolean isReverse, long maxQueryEndTime, int splitSize, ScanWrapper previousScan) throws SQLException { + boolean isReverse, long maxQueryEndTime, int splitSize, ScanWrapper previousScan, int retryCount) throws SQLException { boolean success = false; final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(splitSize); allFutures.add(futures); SQLException toThrow = null; + final HashCacheClient hashCacheClient = new HashCacheClient(context.getConnection()); int queryTimeOut = context.getStatement().getQueryTimeoutInMillis(); try { submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper); @@ -900,24 +908,38 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } catch (ExecutionException e) { try { // Rethrow as SQLException throw ServerUtil.parseServerException(e); - } catch (StaleRegionBoundaryCacheException e2) { + } catch (StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException e2){ // Catch only to try to recover from region boundary cache being out of date if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries services.clearTableRegionCache(physicalTableName); context.getOverallQueryMetrics().cacheRefreshedDueToSplits(); } + // Resubmit just this portion of work again + Scan oldScan = scanPair.getFirst(); + byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW); + if(e2 instanceof HashJoinCacheNotFoundException){ + logger.debug( + "Retrying when Hash Join cache is not found on the server ,by sending the cache again"); + if(retryCount<=0){ + throw e2; + } + Long cacheId = ((HashJoinCacheNotFoundException)e2).getCacheId(); + if (!hashCacheClient.addHashCacheToServer(startKey, + caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))), plan.getTableRef().getTable())) { throw e2; } + } concatIterators = recreateIterators(services, isLocalIndex, allIterators, iterators, isReverse, maxQueryEndTime, previousScan, - clearedCache, concatIterators, scanPairItr, scanPair); + clearedCache, concatIterators, scanPairItr, scanPair, retryCount-1); } catch(ColumnFamilyNotFoundException cfnfe) { if (scanPair.getFirst().getAttribute(LOCAL_INDEX_BUILD) != null) { Thread.sleep(1000); concatIterators = recreateIterators(services, isLocalIndex, allIterators, iterators, isReverse, maxQueryEndTime, previousScan, - clearedCache, concatIterators, scanPairItr, scanPair); + clearedCache, concatIterators, scanPairItr, scanPair, retryCount); } + } } } @@ -976,7 +998,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result ScanWrapper previousScan, boolean clearedCache, List<PeekingResultIterator> concatIterators, Iterator<Pair<Scan, Future<PeekingResultIterator>>> scanPairItr, - Pair<Scan, Future<PeekingResultIterator>> scanPair) throws SQLException { + Pair<Scan, Future<PeekingResultIterator>> scanPair, int retryCount) throws SQLException { scanPairItr.remove(); // Resubmit just this portion of work again Scan oldScan = scanPair.getFirst(); @@ -989,20 +1011,21 @@ public abstract class BaseResultIterators extends ExplainTable implements Result addIterator(iterators, concatIterators); concatIterators = Lists.newArrayList(); getIterators(newNestedScans, services, isLocalIndex, allIterators, iterators, isReverse, - maxQueryEndTime, newNestedScans.size(), previousScan); + maxQueryEndTime, newNestedScans.size(), previousScan, retryCount); return concatIterators; } @Override public void close() throws SQLException { - if (allFutures.isEmpty()) { - return; - } + // Don't call cancel on already started work, as it causes the HConnection // to get into a funk. Instead, just cancel queued work. boolean cancelledWork = false; try { + if (allFutures.isEmpty()) { + return; + } List<Future<PeekingResultIterator>> futuresToClose = Lists.newArrayListWithExpectedSize(getSplits().size()); for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) { for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) { @@ -1037,6 +1060,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } } finally { + SQLCloseables.closeAllQuietly(caches.values()); + caches.clear(); if (cancelledWork) { context.getConnection().getQueryServices().getExecutor().purge(); } @@ -1146,4 +1171,4 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return "ResultIterators [name=" + getName() + ",id=" + scanId + ",scans=" + scans + "]"; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java index 976b839..44c714f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java @@ -18,10 +18,13 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; +import java.util.Map; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.schema.TableRef; @@ -30,9 +33,9 @@ public class DefaultTableResultIteratorFactory implements TableResultIteratorFac @Override public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, - QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { + QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { return new TableResultIterator(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, - plan, scanGrouper); + plan, scanGrouper, caches); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index f0360e2..3c11f4a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -22,15 +22,17 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARAL import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.job.JobManager.JobCallable; -import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.monitoring.ReadMetricQueue; import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; @@ -55,16 +57,16 @@ public class ParallelIterators extends BaseResultIterators { private final ParallelIteratorFactory iteratorFactory; private final boolean initFirstScanOnly; - public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly) + public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { - super(plan, perScanLimit, null, scanGrouper, scan); + super(plan, perScanLimit, null, scanGrouper, scan,caches); this.iteratorFactory = iteratorFactory; this.initFirstScanOnly = initFirstScanOnly; } - public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion) + public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { - this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion); + this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches); } @Override @@ -106,7 +108,7 @@ public class ParallelIterators extends BaseResultIterators { final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator( mutationState, tableRef, scan, scanMetricsHolder, renewLeaseThreshold, plan, - scanGrouper); + scanGrouper, caches); context.getConnection().addIteratorForLeaseRenewal(tableResultItr); Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index eb0c949..26d1ed1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -20,6 +20,7 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -27,8 +28,10 @@ import java.util.concurrent.Future; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.monitoring.ReadMetricQueue; @@ -59,9 +62,9 @@ public class SerialIterators extends BaseResultIterators { private final Integer offset; public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer offset, - ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan) + ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { - super(plan, perScanLimit, offset, scanGrouper, scan); + super(plan, perScanLimit, offset, scanGrouper, scan, caches); this.offset = offset; // must be a offset or a limit specified or a SERIAL hint Preconditions.checkArgument( @@ -90,7 +93,7 @@ public class SerialIterators extends BaseResultIterators { Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { @Override public PeekingResultIterator call() throws Exception { - PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset); + PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset, caches); return itr; } @@ -140,13 +143,15 @@ public class SerialIterators extends BaseResultIterators { private int index; private PeekingResultIterator currentIterator; private Integer remainingOffset; + private Map<ImmutableBytesPtr,ServerCache> caches; - private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset) throws SQLException { + private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { this.scans = Lists.newArrayListWithExpectedSize(flattenedScans.size()); this.tableName = tableName; this.renewLeaseThreshold = renewLeaseThreshold; this.scans.addAll(flattenedScans); this.remainingOffset = offset; + this.caches = caches; if (this.remainingOffset != null) { // mark the last scan for offset purposes this.scans.get(this.scans.size() - 1).setAttribute(QueryConstants.LAST_SCAN, Bytes.toBytes(Boolean.TRUE)); @@ -180,7 +185,7 @@ public class SerialIterators extends BaseResultIterators { isRequestMetricsEnabled); TableResultIterator itr = new TableResultIterator(mutationState, currentScan, scanMetricsHolder, - renewLeaseThreshold, plan, scanGrouper); + renewLeaseThreshold, plan, scanGrouper, caches); PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName, plan); Tuple tuple; if ((tuple = peekingItr.peek()) == null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 5114acc..e812854 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -30,6 +30,7 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UN import java.io.IOException; import java.sql.SQLException; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -39,9 +40,17 @@ import org.apache.hadoop.hbase.client.AbstractClientScanner; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.cache.ServerCacheClient; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; +import org.apache.phoenix.execute.BaseQueryPlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.monitoring.ScanMetricsHolder; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.tuple.Tuple; @@ -50,6 +59,8 @@ import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -70,9 +81,9 @@ public class TableResultIterator implements ResultIterator { private final long renewLeaseThreshold; private final QueryPlan plan; private final ParallelScanGrouper scanGrouper; + private static final Logger logger = LoggerFactory.getLogger(TableResultIterator.class); private Tuple lastTuple = null; private ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - @GuardedBy("renewLeaseLock") private ResultIterator scanIterator; @@ -84,6 +95,10 @@ public class TableResultIterator implements ResultIterator { private final Lock renewLeaseLock = new ReentrantLock(); + private int retry; + private Map<ImmutableBytesPtr,ServerCache> caches; + private HashCacheClient hashCacheClient; + @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE! TableResultIterator() { this.scanMetricsHolder = null; @@ -92,14 +107,21 @@ public class TableResultIterator implements ResultIterator { this.scan = null; this.plan = null; this.scanGrouper = null; + this.caches = null; + this.retry = 0; } public static enum RenewLeaseStatus { RENEWED, NOT_RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, LOCK_NOT_ACQUIRED, NOT_SUPPORTED }; - + public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { + this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null); + } + + public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder, + long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { this.scan = scan; this.scanMetricsHolder = scanMetricsHolder; this.plan = plan; @@ -108,6 +130,10 @@ public class TableResultIterator implements ResultIterator { this.scanIterator = UNINITIALIZED_SCANNER; this.renewLeaseThreshold = renewLeaseThreshold; this.scanGrouper = scanGrouper; + this.hashCacheClient = new HashCacheClient(plan.getContext().getConnection()); + this.caches = caches; + this.retry=plan.getContext().getConnection().getQueryServices().getProps() + .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); } @Override @@ -145,7 +171,7 @@ public class TableResultIterator implements ResultIterator { } catch (SQLException e) { try { throw ServerUtil.parseServerException(e); - } catch(StaleRegionBoundaryCacheException e1) { + } catch(StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException e1) { if(ScanUtil.isNonAggregateScan(scan)) { // For non aggregate queries if we get stale region boundary exception we can // continue scanning from the next value of lasted fetched result. @@ -163,8 +189,28 @@ public class TableResultIterator implements ResultIterator { } } plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName()); - this.scanIterator = - plan.iterator(scanGrouper, newScan); + if (e1 instanceof HashJoinCacheNotFoundException) { + logger.debug( + "Retrying when Hash Join cache is not found on the server ,by sending the cache again"); + if (retry <= 0) { + throw e1; + } + retry--; + try { + Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); + if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(), + caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))), + plan.getTableRef().getTable())) { + throw e1; + } + this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, newScan); + + } catch (Exception e2) { + throw new SQLException(e2); + } + } else { + this.scanIterator = plan.iterator(scanGrouper, newScan); + } lastTuple = scanIterator.next(); } else { throw e; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java index c23e342..0b28d5a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java @@ -18,17 +18,19 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; +import java.util.Map; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.MutationState; -import org.apache.phoenix.monitoring.CombinableMetric; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.schema.TableRef; public interface TableResultIteratorFactory { public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, - QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException; + QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java index 32d0469..2ec509c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java @@ -36,6 +36,7 @@ import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; @@ -56,6 +57,7 @@ import com.google.common.collect.Lists; */ public class HashCacheClient { private final ServerCacheClient serverCache; + /** * Construct client used to create a serialized cached snapshot of a table and send it to each region server * for caching during hash join processing. @@ -81,7 +83,22 @@ public class HashCacheClient { */ ImmutableBytesWritable ptr = new ImmutableBytesWritable(); serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues); - return serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef); + ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef, true); + return cache; + } + + /** + * Should only be used to resend the hash table cache to the regionserver. + * + * @param startkeyOfRegion start key of any region hosted on a regionserver which needs hash cache + * @param cacheId Id of the cache which needs to be sent + * @param pTable + * @return + * @throws Exception + */ + public boolean addHashCacheToServer(byte[] startkeyOfRegion, ServerCache cache, PTable pTable) throws Exception{ + if (cache == null) { return false; } + return serverCache.addServerCache(startkeyOfRegion, cache, new HashCacheFactory(), ByteUtil.EMPTY_BYTE_ARRAY, pTable); } private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException { @@ -179,4 +196,5 @@ public class HashCacheClient { // might be coerced later. return new RowValueConstructorExpression(values, false); } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index e7487a1..7607388 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -399,4 +399,6 @@ public interface QueryConstants { public static final byte[] OFFSET_COLUMN = "c_offset".getBytes(); public static final String LAST_SCAN = "LAST_SCAN"; public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes(); + public static final String HASH_JOIN_CACHE_RETRIES = "hashjoin.client.retries.number"; + public static final int DEFAULT_HASH_JOIN_CACHE_RETRIES = 5; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java index 44502a5..d11f3a2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java @@ -25,6 +25,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; +import java.util.List; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -577,4 +578,18 @@ public class ByteUtil { throw new IllegalArgumentException("Unknown operator " + op); } } + + public static boolean contains(List<byte[]> keys, byte[] key) { + for (byte[] k : keys) { + if (Arrays.equals(k, key)) { return true; } + } + return false; + } + + public static boolean contains(List<ImmutableBytesPtr> keys, ImmutableBytesPtr key) { + for (ImmutableBytesPtr k : keys) { + if (key.compareTo(k) == 0) { return true; } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java index a08d139..45ef9bf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -45,6 +46,7 @@ public class ServerUtil { private static final String FORMAT = "ERROR %d (%s): %s"; private static final Pattern PATTERN = Pattern.compile("ERROR (\\d+) \\((\\w+)\\): (.*)"); + private static final Pattern HASH_JOIN_EXCEPTION_PATTERN = Pattern.compile("joinId: (-?\\d+)"); private static final Pattern PATTERN_FOR_TS = Pattern.compile(",serverTimestamp=(\\d+),"); private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,"; private static final Map<Class<? extends Exception>, SQLExceptionCode> errorcodeMap @@ -127,6 +129,7 @@ public class ServerUtil { } private static SQLException parseRemoteException(Throwable t) { + String message = t.getLocalizedMessage(); if (message != null) { // If the message matches the standard pattern, recover the SQLException and throw it. @@ -134,6 +137,10 @@ public class ServerUtil { if (matcher.find()) { int statusCode = Integer.parseInt(matcher.group(1)); SQLExceptionCode code = SQLExceptionCode.fromErrorCode(statusCode); + if(code.equals(SQLExceptionCode.HASH_JOIN_CACHE_NOT_FOUND)){ + Matcher m = HASH_JOIN_EXCEPTION_PATTERN.matcher(t.getLocalizedMessage()); + if (m.find()) { return new HashJoinCacheNotFoundException(Long.parseLong(m.group(1))); } + } return new SQLExceptionInfo.Builder(code).setMessage(matcher.group()).setRootCause(t).build().buildException(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index fe6d847..d32c443 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -632,6 +632,7 @@ public abstract class BaseTest { conf.setInt("hbase.assignment.zkevent.workers", 5); conf.setInt("hbase.assignment.threads.max", 5); conf.setInt("hbase.catalogjanitor.interval", 5000); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1); return conf; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index a0696c0..3980bc6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -482,7 +482,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { return null; } - }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false); + }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null); List<KeyRange> keyRanges = parallelIterators.getSplits(); return keyRanges; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7865a59b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 6afc796..0129eda 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; @@ -74,6 +75,7 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; +import org.apache.phoenix.end2end.index.PartialIndexRebuilderIT.WriteFailingRegionObserver; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.expression.AndExpression; import org.apache.phoenix.expression.ByteBasedLikeExpression; @@ -937,5 +939,31 @@ public class TestUtil { assertTrue(rs.next()); return rs.getLong(1); } + + public static void addCoprocessor(Connection conn, String tableName, Class coprocessorClass) throws Exception { + int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100; + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName)); + if (!descriptor.getCoprocessors().contains(coprocessorClass.getName())) { + descriptor.addCoprocessor(coprocessorClass.getName(), null, priority, null); + }else{ + return; + } + int numTries = 10; + try (HBaseAdmin admin = services.getAdmin()) { + admin.modifyTable(Bytes.toBytes(tableName), descriptor); + while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor) + && numTries > 0) { + numTries--; + if (numTries == 0) { + throw new Exception( + "Check to detect if delaying co-processor was added failed after " + + numTries + " retries."); + } + Thread.sleep(1000); + } + } + } + }
