Repository: phoenix Updated Branches: refs/heads/4.x-cdh5.13 036adf3b8 -> e63a313b1 (forced update)
PHOENIX-3163 Split during global index creation may cause ERROR 201 error (Sergey Soldatov) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3768da34 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3768da34 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3768da34 Branch: refs/heads/4.x-cdh5.13 Commit: 3768da34b9bea633cc98a00ea3f3facdcdb44eb1 Parents: c892e5c Author: James Taylor <jtay...@salesforce.com> Authored: Thu May 10 12:31:58 2018 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Thu May 10 13:17:56 2018 -0700 ---------------------------------------------------------------------- .../phoenix/compile/StatementContext.java | 9 +++ .../apache/phoenix/compile/UpsertCompiler.java | 1 + .../phoenix/iterate/TableResultIterator.java | 71 +++++++++++--------- 3 files changed, 50 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/3768da34/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index 3e5c8f2..3ea5dd5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -85,6 +85,7 @@ public class StatementContext { private final ReadMetricQueue readMetricsQueue; private final OverAllQueryMetrics overAllQueryMetrics; private QueryLogger queryLogger; + private boolean isClientSideUpsertSelect; public StatementContext(PhoenixStatement statement) { this(statement, new Scan()); @@ -316,5 +317,13 @@ public class StatementContext { public QueryLogger getQueryLogger() { return queryLogger; } + + public boolean isClientSideUpsertSelect() { + return isClientSideUpsertSelect; + } + + public void setClientSideUpsertSelect(boolean isClientSideUpsertSelect) { + this.isClientSideUpsertSelect = isClientSideUpsertSelect; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3768da34/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 22119a3..30f0c18 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -1252,6 +1252,7 @@ public class UpsertCompiler { this.useServerTimestamp = useServerTimestamp; this.maxSize = maxSize; this.maxSizeBytes = maxSizeBytes; + queryPlan.getContext().setClientSideUpsertSelect(true); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/3768da34/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index f6902cc..8c80c28 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; @@ -92,7 +91,7 @@ public class TableResultIterator implements ResultIterator { @GuardedBy("renewLeaseLock") private long renewLeaseTime = 0; - + private final Lock renewLeaseLock = new ReentrantLock(); private int retry; @@ -114,12 +113,12 @@ public class TableResultIterator implements ResultIterator { public static enum RenewLeaseStatus { RENEWED, NOT_RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, LOCK_NOT_ACQUIRED, NOT_SUPPORTED }; - + public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null); } - + public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { this.scan = scan; @@ -133,7 +132,7 @@ public class TableResultIterator implements ResultIterator { this.hashCacheClient = new HashCacheClient(plan.getContext().getConnection()); this.caches = caches; this.retry=plan.getContext().getConnection().getQueryServices().getProps() - .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); + .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); } @Override @@ -156,7 +155,7 @@ public class TableResultIterator implements ResultIterator { } } - + @Override public Tuple next() throws SQLException { try { @@ -172,7 +171,7 @@ public class TableResultIterator implements ResultIterator { try { throw ServerUtil.parseServerException(e); } catch(StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException e1) { - if(ScanUtil.isNonAggregateScan(scan)) { + if(ScanUtil.isNonAggregateScan(scan) && plan.getContext().getAggregationManager().isEmpty()) { // For non aggregate queries if we get stale region boundary exception we can // continue scanning from the next value of lasted fetched result. Scan newScan = ScanUtil.newScan(scan); @@ -189,34 +188,44 @@ public class TableResultIterator implements ResultIterator { } } plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName()); - if (e1 instanceof HashJoinCacheNotFoundException) { - logger.debug( - "Retrying when Hash Join cache is not found on the server ,by sending the cache again"); - if (retry <= 0) { - throw e1; - } - retry--; - try { - Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); + if (e1 instanceof HashJoinCacheNotFoundException) { + logger.debug( + "Retrying when Hash Join cache is not found on the server ,by sending the cache again"); + if (retry <= 0) { + throw e1; + } + retry--; + try { + Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); - ServerCache cache = caches == null ? null : - caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))); + ServerCache cache = caches == null ? null : + caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))); - if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(), - cache, plan.getTableRef().getTable())) { - throw e1; - } - this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, newScan); + if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(), + cache, plan.getTableRef().getTable())) { + throw e1; + } + this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, newScan); - } catch (Exception e2) { - throw new SQLException(e2); - } - } else { - this.scanIterator = plan.iterator(scanGrouper, newScan); - } + } catch (Exception ex) { + throw ServerUtil.parseServerException(ex); + } + } else { + try { + if(plan.getContext().isClientSideUpsertSelect()) { + if(ScanUtil.isLocalIndex(newScan)) { + throw e; + } + this.scanIterator = + new ScanningResultIterator(htable.getScanner(newScan), newScan, scanMetricsHolder); + } else { + this.scanIterator = plan.iterator(scanGrouper, newScan); + } + } catch (IOException ex) { + throw ServerUtil.parseServerException(ex); + } + } lastTuple = scanIterator.next(); - } else { - throw e; } } }