Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 f884b7625 -> 6259055cc
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 8037251..ab41e37 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -20,7 +20,6 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.execute.TupleProjector; - import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java new file mode 100644 index 0000000..6d67348 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.sql.SQLException; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; + +public class HashJoinCacheNotFoundException extends SQLException{ + private static final long serialVersionUID = 1L; + private Long cacheId; + private static SQLExceptionCode ERROR_CODE = SQLExceptionCode.HASH_JOIN_CACHE_NOT_FOUND; + public HashJoinCacheNotFoundException() { + this(null); + } + + public HashJoinCacheNotFoundException(Long cacheId) { + super(new SQLExceptionInfo.Builder(ERROR_CODE).setMessage("joinId: " + cacheId + + ". The cache might have expired and have been removed.").build().toString(), + ERROR_CODE.getSQLState(), ERROR_CODE.getErrorCode(), null); + this.cacheId=cacheId; + } + + public Long getCacheId(){ + return this.cacheId; + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 3de00ae..611446d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -45,6 +45,7 @@ import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TupleUtil; @@ -96,10 +97,11 @@ public class HashJoinRegionScanner implements RegionScanner { continue; } HashCache hashCache = (HashCache)cache.getServerCache(joinId); - if (hashCache == null) - throw new DoNotRetryIOException("Could not find hash cache for joinId: " - + Bytes.toString(joinId.get(), joinId.getOffset(), joinId.getLength()) - + ". The cache might have expired and have been removed."); + if (hashCache == null) { + Exception cause = new HashJoinCacheNotFoundException( + Bytes.toLong(ByteUtil.copyKeyBytesIfNecessary(joinId))); + throw new DoNotRetryIOException(cause.getMessage(), cause); + } hashCaches[i] = hashCache; tempSrcBitSet[i] = ValueBitSet.newInstance(joinInfo.getSchemas()[i]); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index ff4a35c..c9dce27 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -439,7 +439,8 @@ public enum SQLExceptionCode { " of connections to the target cluster."), MAX_MUTATION_SIZE_EXCEEDED(729, "LIM01", "MutationState size is bigger than maximum allowed number of rows"), - MAX_MUTATION_SIZE_BYTES_EXCEEDED(730, "LIM02", "MutationState size is bigger than maximum allowed number of bytes"); + MAX_MUTATION_SIZE_BYTES_EXCEEDED(730, "LIM02", "MutationState size is bigger than maximum allowed number of bytes"), + HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found"); private final int errorCode; private final String sqlState; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 2cdaac7..74c8d39 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -21,9 +21,10 @@ package org.apache.phoenix.execute; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.QueryPlan; @@ -36,6 +37,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.expression.RowKeyExpression; import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.AggregatingResultIterator; import org.apache.phoenix.iterate.BaseResultIterators; import org.apache.phoenix.iterate.ConcatResultIterator; @@ -185,7 +187,7 @@ public class AggregatePlan extends BaseQueryPlan { } @Override - protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { if (groupBy.isEmpty()) { UngroupedAggregateRegionObserver.serializeIntoScan(scan); } else { @@ -221,8 +223,8 @@ public class AggregatePlan extends BaseQueryPlan { } } BaseResultIterators iterators = isSerial - ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan) - : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false); + ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan, caches) + : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false, caches); estimatedRows = iterators.getEstimatedRowCount(); estimatedSize = iterators.getEstimatedByteCount(); splits = iterators.getSplits(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index ac5235a..6f15570 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -24,6 +24,7 @@ import java.sql.ParameterMetaData; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; @@ -46,6 +48,7 @@ import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ProjectedColumnExpression; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.iterate.DefaultParallelScanGrouper; import org.apache.phoenix.iterate.DelegateResultIterator; @@ -72,7 +75,6 @@ import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; -import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; import org.cloudera.htrace.TraceScope; @@ -196,20 +198,20 @@ public abstract class BaseQueryPlan implements QueryPlan { @Override public final ResultIterator iterator() throws SQLException { - return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance(), null); + return iterator(DefaultParallelScanGrouper.getInstance()); } @Override public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { - return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, null); + return iterator(scanGrouper, null); } @Override public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { - return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, scan); + return iterator(Collections.<ImmutableBytesPtr,ServerCache>emptyMap(), scanGrouper, scan); } - - private ResultIterator getWrappedIterator(final List<? extends SQLCloseable> dependencies, + + private ResultIterator getWrappedIterator(final Map<ImmutableBytesPtr,ServerCache> dependencies, ResultIterator iterator) { ResultIterator wrappedIterator = dependencies.isEmpty() ? iterator : new DelegateResultIterator(iterator) { @Override @@ -217,14 +219,15 @@ public abstract class BaseQueryPlan implements QueryPlan { try { super.close(); } finally { - SQLCloseables.closeAll(dependencies); + SQLCloseables.closeAll(dependencies.values()); } } }; return wrappedIterator; } - public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + public final ResultIterator iterator(final Map<ImmutableBytesPtr,ServerCache> caches, + ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { if (scan == null) { scan = context.getScan(); } @@ -235,11 +238,11 @@ public abstract class BaseQueryPlan implements QueryPlan { * row to be scanned. */ if (context.getScanRanges() == ScanRanges.NOTHING && !getStatement().isAggregate()) { - return getWrappedIterator(dependencies, ResultIterator.EMPTY_ITERATOR); + return getWrappedIterator(caches, ResultIterator.EMPTY_ITERATOR); } if (tableRef == TableRef.EMPTY_TABLE_REF) { - return getWrappedIterator(dependencies, newIterator(scanGrouper, scan)); + return newIterator(scanGrouper, scan, caches); } // Set miscellaneous scan attributes. This is the last chance to set them before we @@ -338,7 +341,7 @@ public abstract class BaseQueryPlan implements QueryPlan { LOG.debug(LogUtil.addCustomAnnotations("Scan ready for iteration: " + scan, connection)); } - ResultIterator iterator = getWrappedIterator(dependencies, newIterator(scanGrouper, scan)); + ResultIterator iterator = newIterator(scanGrouper, scan, caches); if (LOG.isDebugEnabled()) { LOG.debug(LogUtil.addCustomAnnotations("Iterator ready: " + iterator, connection)); } @@ -462,7 +465,7 @@ public abstract class BaseQueryPlan implements QueryPlan { } } - abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException; + abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException; @Override public long getEstimatedSize() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 17c3cca..879aa61 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -24,6 +24,7 @@ import static org.apache.phoenix.util.NumberUtil.add; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; @@ -51,6 +52,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.RowValueConstructorExpression; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.FilterResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; @@ -73,10 +75,10 @@ import org.apache.phoenix.schema.types.PArrayDataType; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PVarbinary; -import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class HashJoinPlan extends DelegateQueryPlan { @@ -88,7 +90,7 @@ public class HashJoinPlan extends DelegateQueryPlan { private final boolean recompileWhereClause; private final Set<TableRef> tableRefs; private final int maxServerCacheTimeToLive; - private final List<SQLCloseable> dependencies = Lists.newArrayList(); + private final Map<ImmutableBytesPtr,ServerCache> dependencies = Maps.newHashMap(); private HashCacheClient hashClient; private AtomicLong firstJobEndTime; private List<Expression> keyRangeExpressions; @@ -99,7 +101,7 @@ public class HashJoinPlan extends DelegateQueryPlan { public static HashJoinPlan create(SelectStatement statement, QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) throws SQLException { if (!(plan instanceof HashJoinPlan)) - return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null, Collections.<SQLCloseable>emptyList()); + return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null, Collections.<ImmutableBytesPtr,ServerCache>emptyMap()); HashJoinPlan hashJoinPlan = (HashJoinPlan) plan; assert (hashJoinPlan.joinInfo == null && hashJoinPlan.delegate instanceof BaseQueryPlan); @@ -115,9 +117,9 @@ public class HashJoinPlan extends DelegateQueryPlan { } private HashJoinPlan(SelectStatement statement, - QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause, List<SQLCloseable> dependencies) throws SQLException { + QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause, Map<ImmutableBytesPtr,ServerCache> dependencies) throws SQLException { super(plan); - this.dependencies.addAll(dependencies); + this.dependencies.putAll(dependencies); this.statement = statement; this.joinInfo = joinInfo; this.subPlans = subPlans; @@ -182,7 +184,7 @@ public class HashJoinPlan extends DelegateQueryPlan { try { ServerCache result = futures.get(i).get(); if (result != null) { - dependencies.add(result); + dependencies.put(new ImmutableBytesPtr(result.getId()),result); } subPlans[i].postProcess(result, this); } catch (InterruptedException e) { @@ -198,7 +200,7 @@ public class HashJoinPlan extends DelegateQueryPlan { } } if (firstException != null) { - SQLCloseables.closeAllQuietly(dependencies); + SQLCloseables.closeAllQuietly(dependencies.values()); throw firstException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java index 1853c45..781c07e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java @@ -21,13 +21,16 @@ import java.sql.SQLException; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; @@ -37,6 +40,7 @@ import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.SQLCloseables; public class LiteralResultIterationPlan extends BaseQueryPlan { protected final Iterable<Tuple> tuples; @@ -71,7 +75,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { } @Override - protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) + protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, final Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { ResultIterator scanner = new ResultIterator() { private final Iterator<Tuple> tupleIterator = tuples.iterator(); @@ -81,7 +85,8 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { @Override public void close() throws SQLException { - this.closed = true;; + SQLCloseables.closeAll(caches.values()); + this.closed = true; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 2990b77..f5b1af0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -24,10 +24,12 @@ import static org.apache.phoenix.util.ScanUtil.isRoundRobinPossible; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.RowProjector; @@ -35,6 +37,7 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.ScanRegionObserver; import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.BaseResultIterators; import org.apache.phoenix.iterate.ChunkedResultIterator; import org.apache.phoenix.iterate.ConcatResultIterator; @@ -206,7 +209,7 @@ public class ScanPlan extends BaseQueryPlan { } @Override - protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { // Set any scan attributes before creating the scanner, as it will be too late afterwards scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE); ResultIterator scanner; @@ -229,11 +232,11 @@ public class ScanPlan extends BaseQueryPlan { && isDataToScanWithinThreshold; BaseResultIterators iterators; if (isOffsetOnServer) { - iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan); + iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan, caches); } else if (isSerial) { - iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan); + iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan, caches); } else { - iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly); + iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches); } estimatedRows = iterators.getEstimatedRowCount(); estimatedSize = iterators.getEstimatedByteCount(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 251908f..4b50ea9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -68,11 +68,13 @@ import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -82,11 +84,13 @@ import org.apache.phoenix.filter.DistinctPrefixFilter; import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.VersionUtil; +import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; @@ -135,7 +139,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private static final Logger logger = LoggerFactory.getLogger(BaseResultIterators.class); private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20; private static final int MIN_SEEK_TO_COLUMN_VERSION = VersionUtil.encodeVersion("0", "98", "12"); - private final List<List<Scan>> scans; private final List<KeyRange> splits; private final byte[] physicalTableName; @@ -150,6 +153,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private boolean hasGuidePosts; private Scan scan; private boolean useStatsForParallelization; + protected Map<ImmutableBytesPtr,ServerCache> caches; static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() { @Override @@ -466,11 +470,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } - public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), QueryUtil.getOffsetLimit(plan.getLimit(), plan.getOffset()), offset); this.plan = plan; this.scan = scan; + this.caches = caches; this.scanGrouper = scanGrouper; StatementContext context = plan.getContext(); // Clone MutationState as the one on the connection will change if auto commit is on @@ -846,7 +851,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans); ScanWrapper previousScan = new ScanWrapper(null); return getIterators(scans, services, isLocalIndex, allIterators, iterators, isReverse, maxQueryEndTime, - splits.size(), previousScan); + splits.size(), previousScan, context.getConnection().getQueryServices().getConfiguration() + .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES)); } class ScanWrapper { @@ -868,11 +874,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private List<PeekingResultIterator> getIterators(List<List<Scan>> scan, ConnectionQueryServices services, boolean isLocalIndex, Queue<PeekingResultIterator> allIterators, List<PeekingResultIterator> iterators, - boolean isReverse, long maxQueryEndTime, int splitSize, ScanWrapper previousScan) throws SQLException { + boolean isReverse, long maxQueryEndTime, int splitSize, ScanWrapper previousScan, int retryCount) throws SQLException { boolean success = false; final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(splitSize); allFutures.add(futures); SQLException toThrow = null; + final HashCacheClient hashCacheClient = new HashCacheClient(context.getConnection()); int queryTimeOut = context.getStatement().getQueryTimeoutInMillis(); try { submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper); @@ -902,7 +909,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } catch (ExecutionException e) { try { // Rethrow as SQLException throw ServerUtil.parseServerException(e); - } catch (StaleRegionBoundaryCacheException e2) { + } catch (StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException e2){ /* * Note that a StaleRegionBoundaryCacheException could be thrown in multiple scenarios including splits, region * moves, table disabled, etc. See ServerUtil.parseServerException() for details. @@ -917,23 +924,35 @@ public abstract class BaseResultIterators extends ExplainTable implements Result throw new TableNotEnabledException(physicalTableName); } } - scanPairItr.remove(); // Catch only to try to recover from region boundary cache being out of date if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries services.clearTableRegionCache(physicalTableName); context.getOverallQueryMetrics().cacheRefreshedDueToSplits(); } + // Resubmit just this portion of work again + Scan oldScan = scanPair.getFirst(); + byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW); + if(e2 instanceof HashJoinCacheNotFoundException){ + logger.debug( + "Retrying when Hash Join cache is not found on the server ,by sending the cache again"); + if(retryCount<=0){ + throw e2; + } + Long cacheId = ((HashJoinCacheNotFoundException)e2).getCacheId(); + if (!hashCacheClient.addHashCacheToServer(startKey, + caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))), plan.getTableRef().getTable())) { throw e2; } + } concatIterators = recreateIterators(services, isLocalIndex, allIterators, iterators, isReverse, maxQueryEndTime, previousScan, - concatIterators, scanPair); + clearedCache, concatIterators,scanPairItr, scanPair, retryCount-1); } catch(ColumnFamilyNotFoundException cfnfe) { if (scanPair.getFirst().getAttribute(LOCAL_INDEX_BUILD) != null) { Thread.sleep(1000); concatIterators = recreateIterators(services, isLocalIndex, allIterators, iterators, isReverse, maxQueryEndTime, previousScan, - clearedCache, concatIterators, scanPairItr, scanPair); + clearedCache, concatIterators, scanPairItr, scanPair, retryCount); } } } @@ -991,7 +1010,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result boolean isLocalIndex, Queue<PeekingResultIterator> allIterators, List<PeekingResultIterator> iterators, boolean isReverse, long maxQueryEndTime, ScanWrapper previousScan, List<PeekingResultIterator> concatIterators, - Pair<Scan, Future<PeekingResultIterator>> scanPair) throws SQLException { + Pair<Scan, Future<PeekingResultIterator>> scanPair, int retryCount) throws SQLException { // Resubmit just this portion of work again Scan oldScan = scanPair.getFirst(); byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW); @@ -1003,7 +1022,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result addIterator(iterators, concatIterators); concatIterators = Lists.newArrayList(); getIterators(newNestedScans, services, isLocalIndex, allIterators, iterators, isReverse, - maxQueryEndTime, newNestedScans.size(), previousScan); + maxQueryEndTime, newNestedScans.size(), previousScan, retryCount); return concatIterators; } @@ -1013,24 +1032,25 @@ public abstract class BaseResultIterators extends ExplainTable implements Result ScanWrapper previousScan, boolean clearedCache, List<PeekingResultIterator> concatIterators, Iterator<Pair<Scan, Future<PeekingResultIterator>>> scanPairItr, - Pair<Scan, Future<PeekingResultIterator>> scanPair) throws SQLException { + Pair<Scan, Future<PeekingResultIterator>> scanPair, int retryCount) throws SQLException { scanPairItr.remove(); concatIterators = recreateIterators(services, isLocalIndex, allIterators, iterators, isReverse, - maxQueryEndTime, previousScan, concatIterators, scanPair); + maxQueryEndTime, previousScan, concatIterators, scanPair, retryCount); return concatIterators; } @Override public void close() throws SQLException { - if (allFutures.isEmpty()) { - return; - } + // Don't call cancel on already started work, as it causes the HConnection // to get into a funk. Instead, just cancel queued work. boolean cancelledWork = false; try { + if (allFutures.isEmpty()) { + return; + } List<Future<PeekingResultIterator>> futuresToClose = Lists.newArrayListWithExpectedSize(getSplits().size()); for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) { for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) { @@ -1065,6 +1085,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } } finally { + SQLCloseables.closeAllQuietly(caches.values()); + caches.clear(); if (cancelledWork) { context.getConnection().getQueryServices().getExecutor().purge(); } @@ -1174,4 +1196,4 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return "ResultIterators [name=" + getName() + ",id=" + scanId + ",scans=" + scans + "]"; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java index b720b56..5d52625 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java @@ -18,10 +18,13 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; +import java.util.Map; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.schema.TableRef; @@ -29,8 +32,8 @@ public class DefaultTableResultIteratorFactory implements TableResultIteratorFac @Override public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, - CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { - return new TableResultIterator(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); + CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { + return new TableResultIterator(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper, caches); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 8c9b689..f5d226f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -22,13 +22,16 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARAL import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.monitoring.MetricType; @@ -55,16 +58,16 @@ public class ParallelIterators extends BaseResultIterators { private final ParallelIteratorFactory iteratorFactory; private final boolean initFirstScanOnly; - public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly) + public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { - super(plan, perScanLimit, null, scanGrouper, scan); + super(plan, perScanLimit, null, scanGrouper, scan,caches); this.iteratorFactory = iteratorFactory; this.initFirstScanOnly = initFirstScanOnly; } - public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion) + public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { - this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion); + this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches); } @Override @@ -101,7 +104,7 @@ public class ParallelIterators extends BaseResultIterators { final Scan scan = scanLocation.getScan(); final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName); final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName); - final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); + final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper, caches); context.getConnection().addIteratorForLeaseRenewal(tableResultItr); Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index d8f7f40..b13749c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -22,6 +22,7 @@ import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -29,8 +30,10 @@ import java.util.concurrent.Future; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; @@ -59,9 +62,9 @@ public class SerialIterators extends BaseResultIterators { private final Integer offset; public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer offset, - ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan) + ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { - super(plan, perScanLimit, offset, scanGrouper, scan); + super(plan, perScanLimit, offset, scanGrouper, scan, caches); this.offset = offset; // must be a offset or a limit specified or a SERIAL hint Preconditions.checkArgument( @@ -90,7 +93,7 @@ public class SerialIterators extends BaseResultIterators { Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { @Override public PeekingResultIterator call() throws Exception { - PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset); + PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset, caches); return itr; } @@ -140,13 +143,15 @@ public class SerialIterators extends BaseResultIterators { private int index; private PeekingResultIterator currentIterator; private Integer remainingOffset; + private Map<ImmutableBytesPtr,ServerCache> caches; - private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset) throws SQLException { + private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { this.scans = Lists.newArrayListWithExpectedSize(flattenedScans.size()); this.tableName = tableName; this.renewLeaseThreshold = renewLeaseThreshold; this.scans.addAll(flattenedScans); this.remainingOffset = offset; + this.caches = caches; if (this.remainingOffset != null) { // mark the last scan for offset purposes this.scans.get(this.scans.size() - 1).setAttribute(QueryConstants.LAST_SCAN, Bytes.toBytes(Boolean.TRUE)); @@ -173,7 +178,7 @@ public class SerialIterators extends BaseResultIterators { if (remainingOffset != null) { currentScan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, PInteger.INSTANCE.toBytes(remainingOffset)); } - TableResultIterator itr = new TableResultIterator(mutationState, currentScan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold, plan, scanGrouper); + TableResultIterator itr = new TableResultIterator(mutationState, currentScan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold, plan, scanGrouper, caches); PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName, plan); Tuple tuple; if ((tuple = peekingItr.peek()) == null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 39554d3..5a3b77b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -30,6 +30,7 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UN import java.io.IOException; import java.sql.SQLException; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -39,9 +40,16 @@ import org.apache.hadoop.hbase.client.AbstractClientScanner; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; +import org.apache.phoenix.execute.BaseQueryPlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.monitoring.CombinableMetric; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.tuple.Tuple; @@ -50,6 +58,8 @@ import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -72,9 +82,9 @@ public class TableResultIterator implements ResultIterator { private final long renewLeaseThreshold; private final QueryPlan plan; private final ParallelScanGrouper scanGrouper; + private static final Logger logger = LoggerFactory.getLogger(TableResultIterator.class); private Tuple lastTuple = null; private ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - @GuardedBy("renewLeaseLock") private ResultIterator scanIterator; @@ -86,6 +96,10 @@ public class TableResultIterator implements ResultIterator { private final Lock renewLeaseLock = new ReentrantLock(); + private int retry; + private Map<ImmutableBytesPtr,ServerCache> caches; + private HashCacheClient hashCacheClient; + @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE! TableResultIterator() { this.scanMetrics = null; @@ -94,6 +108,8 @@ public class TableResultIterator implements ResultIterator { this.scan = null; this.plan = null; this.scanGrouper = null; + this.caches = null; + this.retry = 0; } public static enum RenewLeaseStatus { @@ -102,6 +118,11 @@ public class TableResultIterator implements ResultIterator { public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { + this(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper, null); + } + + public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics, + long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { this.scan = scan; this.scanMetrics = scanMetrics; this.plan = plan; @@ -110,6 +131,10 @@ public class TableResultIterator implements ResultIterator { this.scanIterator = UNINITIALIZED_SCANNER; this.renewLeaseThreshold = renewLeaseThreshold; this.scanGrouper = scanGrouper; + this.hashCacheClient = new HashCacheClient(plan.getContext().getConnection()); + this.caches = caches; + this.retry=plan.getContext().getConnection().getQueryServices().getProps() + .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); } @Override @@ -147,7 +172,7 @@ public class TableResultIterator implements ResultIterator { } catch (SQLException e) { try { throw ServerUtil.parseServerException(e); - } catch(StaleRegionBoundaryCacheException e1) { + } catch(StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException e1) { if(ScanUtil.isNonAggregateScan(scan)) { // For non aggregate queries if we get stale region boundary exception we can // continue scanning from the next value of lasted fetched result. @@ -165,8 +190,28 @@ public class TableResultIterator implements ResultIterator { } } plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName()); - this.scanIterator = - plan.iterator(scanGrouper, newScan); + if (e1 instanceof HashJoinCacheNotFoundException) { + logger.debug( + "Retrying when Hash Join cache is not found on the server ,by sending the cache again"); + if (retry <= 0) { + throw e1; + } + retry--; + try { + Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); + if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(), + caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))), + plan.getTableRef().getTable())) { + throw e1; + } + this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, newScan); + + } catch (Exception e2) { + throw new SQLException(e2); + } + } else { + this.scanIterator = plan.iterator(scanGrouper, newScan); + } lastTuple = scanIterator.next(); } else { throw e; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java index 8d7b54d..df451b1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java @@ -18,13 +18,16 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; +import java.util.Map; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.schema.TableRef; public interface TableResultIteratorFactory { - public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException; + public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java index 32d0469..2ec509c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java @@ -36,6 +36,7 @@ import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; @@ -56,6 +57,7 @@ import com.google.common.collect.Lists; */ public class HashCacheClient { private final ServerCacheClient serverCache; + /** * Construct client used to create a serialized cached snapshot of a table and send it to each region server * for caching during hash join processing. @@ -81,7 +83,22 @@ public class HashCacheClient { */ ImmutableBytesWritable ptr = new ImmutableBytesWritable(); serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues); - return serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef); + ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef, true); + return cache; + } + + /** + * Should only be used to resend the hash table cache to the regionserver. + * + * @param startkeyOfRegion start key of any region hosted on a regionserver which needs hash cache + * @param cacheId Id of the cache which needs to be sent + * @param pTable + * @return + * @throws Exception + */ + public boolean addHashCacheToServer(byte[] startkeyOfRegion, ServerCache cache, PTable pTable) throws Exception{ + if (cache == null) { return false; } + return serverCache.addServerCache(startkeyOfRegion, cache, new HashCacheFactory(), ByteUtil.EMPTY_BYTE_ARRAY, pTable); } private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException { @@ -179,4 +196,5 @@ public class HashCacheClient { // might be coerced later. return new RowValueConstructorExpression(values, false); } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index e7487a1..7607388 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -399,4 +399,6 @@ public interface QueryConstants { public static final byte[] OFFSET_COLUMN = "c_offset".getBytes(); public static final String LAST_SCAN = "LAST_SCAN"; public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes(); + public static final String HASH_JOIN_CACHE_RETRIES = "hashjoin.client.retries.number"; + public static final int DEFAULT_HASH_JOIN_CACHE_RETRIES = 5; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java index 44502a5..d11f3a2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java @@ -25,6 +25,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; +import java.util.List; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -577,4 +578,18 @@ public class ByteUtil { throw new IllegalArgumentException("Unknown operator " + op); } } + + public static boolean contains(List<byte[]> keys, byte[] key) { + for (byte[] k : keys) { + if (Arrays.equals(k, key)) { return true; } + } + return false; + } + + public static boolean contains(List<ImmutableBytesPtr> keys, ImmutableBytesPtr key) { + for (ImmutableBytesPtr k : keys) { + if (key.compareTo(k) == 0) { return true; } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java index fe0937b..67b43a3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -45,6 +46,7 @@ public class ServerUtil { private static final String FORMAT = "ERROR %d (%s): %s"; private static final Pattern PATTERN = Pattern.compile("ERROR (\\d+) \\((\\w+)\\): (.*)"); + private static final Pattern HASH_JOIN_EXCEPTION_PATTERN = Pattern.compile("joinId: (-?\\d+)"); private static final Pattern PATTERN_FOR_TS = Pattern.compile(",serverTimestamp=(\\d+),"); private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,"; private static final Map<Class<? extends Exception>, SQLExceptionCode> errorcodeMap @@ -132,6 +134,7 @@ public class ServerUtil { } private static SQLException parseRemoteException(Throwable t) { + String message = t.getLocalizedMessage(); if (message != null) { // If the message matches the standard pattern, recover the SQLException and throw it. @@ -139,6 +142,10 @@ public class ServerUtil { if (matcher.find()) { int statusCode = Integer.parseInt(matcher.group(1)); SQLExceptionCode code = SQLExceptionCode.fromErrorCode(statusCode); + if(code.equals(SQLExceptionCode.HASH_JOIN_CACHE_NOT_FOUND)){ + Matcher m = HASH_JOIN_EXCEPTION_PATTERN.matcher(t.getLocalizedMessage()); + if (m.find()) { return new HashJoinCacheNotFoundException(Long.parseLong(m.group(1))); } + } return new SQLExceptionInfo.Builder(code).setMessage(matcher.group()).setRootCause(t).build().buildException(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index dc6f4c9..556f195 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -620,6 +620,7 @@ public abstract class BaseTest { conf.setInt("hbase.assignment.zkevent.workers", 5); conf.setInt("hbase.assignment.threads.max", 5); conf.setInt("hbase.catalogjanitor.interval", 5000); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1); return conf; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index a0696c0..3980bc6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -482,7 +482,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { return null; } - }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false); + }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null); List<KeyRange> keyRanges = parallelIterators.getSplits(); return keyRanges; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 6afc796..d69d3ff 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; @@ -937,5 +938,33 @@ public class TestUtil { assertTrue(rs.next()); return rs.getLong(1); } + + public static void addCoprocessor(Connection conn, String tableName, Class coprocessorClass) throws Exception { + int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100; + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName)); + if (!descriptor.getCoprocessors().contains(coprocessorClass.getName())) { + descriptor.addCoprocessor(coprocessorClass.getName(), null, priority, null); + }else{ + return; + } + int numTries = 10; + try (HBaseAdmin admin = services.getAdmin()) { + admin.modifyTable(Bytes.toBytes(tableName), descriptor); + admin.disableTable(tableName); + admin.enableTable(tableName); + while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor) + && numTries > 0) { + numTries--; + if (numTries == 0) { + throw new Exception( + "Check to detect if delaying co-processor was added failed after " + + numTries + " retries."); + } + Thread.sleep(1000); + } + } + } + }
