Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 f884b7625 -> 6259055cc


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 8037251..ab41e37 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.coprocessor;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.execute.TupleProjector;
-
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java
new file mode 100644
index 0000000..6d67348
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+
+public class HashJoinCacheNotFoundException extends SQLException{
+    private static final long serialVersionUID = 1L;
+    private Long cacheId;
+    private static SQLExceptionCode ERROR_CODE = 
SQLExceptionCode.HASH_JOIN_CACHE_NOT_FOUND;
+    public HashJoinCacheNotFoundException() {
+        this(null);
+    }
+
+    public HashJoinCacheNotFoundException(Long cacheId) {
+        super(new SQLExceptionInfo.Builder(ERROR_CODE).setMessage("joinId: " + 
cacheId
+                + ". The cache might have expired and have been 
removed.").build().toString(),
+                ERROR_CODE.getSQLState(), ERROR_CODE.getErrorCode(), null);
+        this.cacheId=cacheId;
+    }
+    
+    public Long getCacheId(){
+        return this.cacheId;
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 3de00ae..611446d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -45,6 +45,7 @@ import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TupleUtil;
 
@@ -96,10 +97,11 @@ public class HashJoinRegionScanner implements RegionScanner 
{
                 continue;
             }
             HashCache hashCache = (HashCache)cache.getServerCache(joinId);
-            if (hashCache == null)
-                throw new DoNotRetryIOException("Could not find hash cache for 
joinId: " 
-                        + Bytes.toString(joinId.get(), joinId.getOffset(), 
joinId.getLength()) 
-                        + ". The cache might have expired and have been 
removed.");
+            if (hashCache == null) {
+                Exception cause = new HashJoinCacheNotFoundException(
+                        
Bytes.toLong(ByteUtil.copyKeyBytesIfNecessary(joinId)));
+                throw new DoNotRetryIOException(cause.getMessage(), cause);
+            }
             hashCaches[i] = hashCache;
             tempSrcBitSet[i] = 
ValueBitSet.newInstance(joinInfo.getSchemas()[i]);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index ff4a35c..c9dce27 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -439,7 +439,8 @@ public enum SQLExceptionCode {
         " of connections to the target cluster."),
     
     MAX_MUTATION_SIZE_EXCEEDED(729, "LIM01", "MutationState size is bigger 
than maximum allowed number of rows"),
-    MAX_MUTATION_SIZE_BYTES_EXCEEDED(730, "LIM02", "MutationState size is 
bigger than maximum allowed number of bytes");
+    MAX_MUTATION_SIZE_BYTES_EXCEEDED(730, "LIM02", "MutationState size is 
bigger than maximum allowed number of bytes"), 
+    HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found");
 
     private final int errorCode;
     private final String sqlState;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 2cdaac7..74c8d39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -21,9 +21,10 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
@@ -36,6 +37,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.RowKeyExpression;
 import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.AggregatingResultIterator;
 import org.apache.phoenix.iterate.BaseResultIterators;
 import org.apache.phoenix.iterate.ConcatResultIterator;
@@ -185,7 +187,7 @@ public class AggregatePlan extends BaseQueryPlan {
     }
     
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan 
scan) throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan 
scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
         if (groupBy.isEmpty()) {
             UngroupedAggregateRegionObserver.serializeIntoScan(scan);
         } else {
@@ -221,8 +223,8 @@ public class AggregatePlan extends BaseQueryPlan {
             }
         }
         BaseResultIterators iterators = isSerial
-                ? new SerialIterators(this, null, null, 
wrapParallelIteratorFactory(), scanGrouper, scan)
-                : new ParallelIterators(this, null, 
wrapParallelIteratorFactory(), scan, false);
+                ? new SerialIterators(this, null, null, 
wrapParallelIteratorFactory(), scanGrouper, scan, caches)
+                : new ParallelIterators(this, null, 
wrapParallelIteratorFactory(), scan, false, caches);
         estimatedRows = iterators.getEstimatedRowCount();
         estimatedSize = iterators.getEstimatedByteCount();
         splits = iterators.getSplits();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index ac5235a..6f15570 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -24,6 +24,7 @@ import java.sql.ParameterMetaData;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -46,6 +48,7 @@ import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.DelegateResultIterator;
@@ -72,7 +75,6 @@ import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 import org.cloudera.htrace.TraceScope;
@@ -196,20 +198,20 @@ public abstract class BaseQueryPlan implements QueryPlan {
 
     @Override
     public final ResultIterator iterator() throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList(), 
DefaultParallelScanGrouper.getInstance(), null);
+        return iterator(DefaultParallelScanGrouper.getInstance());
     }
     
     @Override
     public final ResultIterator iterator(ParallelScanGrouper scanGrouper) 
throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, 
null);
+        return iterator(scanGrouper, null);
     }
 
     @Override
     public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan 
scan) throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, 
scan);
+        return iterator(Collections.<ImmutableBytesPtr,ServerCache>emptyMap(), 
scanGrouper, scan);
     }
-    
-       private ResultIterator getWrappedIterator(final List<? extends 
SQLCloseable> dependencies,
+        
+       private ResultIterator getWrappedIterator(final 
Map<ImmutableBytesPtr,ServerCache> dependencies,
                        ResultIterator iterator) {
                ResultIterator wrappedIterator = dependencies.isEmpty() ? 
iterator : new DelegateResultIterator(iterator) {
                        @Override
@@ -217,14 +219,15 @@ public abstract class BaseQueryPlan implements QueryPlan {
                                try {
                                        super.close();
                                } finally {
-                                       SQLCloseables.closeAll(dependencies);
+                                       
SQLCloseables.closeAll(dependencies.values());
                                }
                        }
                };
                return wrappedIterator;
        }
 
-    public final ResultIterator iterator(final List<? extends SQLCloseable> 
dependencies, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+    public final ResultIterator iterator(final 
Map<ImmutableBytesPtr,ServerCache> caches,
+            ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
          if (scan == null) {
              scan = context.getScan();
          }
@@ -235,11 +238,11 @@ public abstract class BaseQueryPlan implements QueryPlan {
                 * row to be scanned.
                 */
         if (context.getScanRanges() == ScanRanges.NOTHING && 
!getStatement().isAggregate()) {
-            return getWrappedIterator(dependencies, 
ResultIterator.EMPTY_ITERATOR);
+        return getWrappedIterator(caches, ResultIterator.EMPTY_ITERATOR);
         }
         
         if (tableRef == TableRef.EMPTY_TABLE_REF) {
-            return getWrappedIterator(dependencies, newIterator(scanGrouper, 
scan));
+            return newIterator(scanGrouper, scan, caches);
         }
         
         // Set miscellaneous scan attributes. This is the last chance to set 
them before we
@@ -338,7 +341,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
                LOG.debug(LogUtil.addCustomAnnotations("Scan ready for 
iteration: " + scan, connection));
         }
         
-        ResultIterator iterator = getWrappedIterator(dependencies, 
newIterator(scanGrouper, scan));
+        ResultIterator iterator =  newIterator(scanGrouper, scan, caches);
         if (LOG.isDebugEnabled()) {
                LOG.debug(LogUtil.addCustomAnnotations("Iterator ready: " + 
iterator, connection));
         }
@@ -462,7 +465,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         }
     }
 
-    abstract protected ResultIterator newIterator(ParallelScanGrouper 
scanGrouper, Scan scan) throws SQLException;
+    abstract protected ResultIterator newIterator(ParallelScanGrouper 
scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws 
SQLException;
     
     @Override
     public long getEstimatedSize() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 17c3cca..879aa61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -24,6 +24,7 @@ import static org.apache.phoenix.util.NumberUtil.add;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
@@ -51,6 +52,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.FilterResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -73,10 +75,10 @@ import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
-import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class HashJoinPlan extends DelegateQueryPlan {
@@ -88,7 +90,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private final boolean recompileWhereClause;
     private final Set<TableRef> tableRefs;
     private final int maxServerCacheTimeToLive;
-    private final List<SQLCloseable> dependencies = Lists.newArrayList();
+    private final Map<ImmutableBytesPtr,ServerCache> dependencies = 
Maps.newHashMap();
     private HashCacheClient hashClient;
     private AtomicLong firstJobEndTime;
     private List<Expression> keyRangeExpressions;
@@ -99,7 +101,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     public static HashJoinPlan create(SelectStatement statement, 
             QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) throws 
SQLException {
         if (!(plan instanceof HashJoinPlan))
-            return new HashJoinPlan(statement, plan, joinInfo, subPlans, 
joinInfo == null, Collections.<SQLCloseable>emptyList());
+            return new HashJoinPlan(statement, plan, joinInfo, subPlans, 
joinInfo == null, Collections.<ImmutableBytesPtr,ServerCache>emptyMap());
         
         HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
         assert (hashJoinPlan.joinInfo == null && hashJoinPlan.delegate 
instanceof BaseQueryPlan);
@@ -115,9 +117,9 @@ public class HashJoinPlan extends DelegateQueryPlan {
     }
     
     private HashJoinPlan(SelectStatement statement, 
-            QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean 
recompileWhereClause, List<SQLCloseable> dependencies) throws SQLException {
+            QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean 
recompileWhereClause, Map<ImmutableBytesPtr,ServerCache> dependencies) throws 
SQLException {
         super(plan);
-        this.dependencies.addAll(dependencies);
+        this.dependencies.putAll(dependencies);
         this.statement = statement;
         this.joinInfo = joinInfo;
         this.subPlans = subPlans;
@@ -182,7 +184,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
             try {
                 ServerCache result = futures.get(i).get();
                 if (result != null) {
-                    dependencies.add(result);
+                    dependencies.put(new 
ImmutableBytesPtr(result.getId()),result);
                 }
                 subPlans[i].postProcess(result, this);
             } catch (InterruptedException e) {
@@ -198,7 +200,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
             }
         }
         if (firstException != null) {
-            SQLCloseables.closeAllQuietly(dependencies);
+            SQLCloseables.closeAllQuietly(dependencies.values());
             throw firstException;
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 1853c45..781c07e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -21,13 +21,16 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -37,6 +40,7 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SQLCloseables;
 
 public class LiteralResultIterationPlan extends BaseQueryPlan {
     protected final Iterable<Tuple> tuples;
@@ -71,7 +75,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan 
{
     }
 
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan 
scan)
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan 
scan, final Map<ImmutableBytesPtr,ServerCache> caches)
             throws SQLException {
         ResultIterator scanner = new ResultIterator() {
             private final Iterator<Tuple> tupleIterator = tuples.iterator();
@@ -81,7 +85,8 @@ public class LiteralResultIterationPlan extends BaseQueryPlan 
{
 
             @Override
             public void close() throws SQLException {
-                this.closed = true;;
+                SQLCloseables.closeAll(caches.values());
+                this.closed = true;
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 2990b77..f5b1af0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -24,10 +24,12 @@ import static 
org.apache.phoenix.util.ScanUtil.isRoundRobinPossible;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
@@ -35,6 +37,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.BaseResultIterators;
 import org.apache.phoenix.iterate.ChunkedResultIterator;
 import org.apache.phoenix.iterate.ConcatResultIterator;
@@ -206,7 +209,7 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan 
scan) throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan 
scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be 
too late afterwards
         scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, 
QueryConstants.TRUE);
         ResultIterator scanner;
@@ -229,11 +232,11 @@ public class ScanPlan extends BaseQueryPlan {
                         && isDataToScanWithinThreshold; 
         BaseResultIterators iterators;
         if (isOffsetOnServer) {
-            iterators = new SerialIterators(this, perScanLimit, offset, 
parallelIteratorFactory, scanGrouper, scan);
+            iterators = new SerialIterators(this, perScanLimit, offset, 
parallelIteratorFactory, scanGrouper, scan, caches);
         } else if (isSerial) {
-            iterators = new SerialIterators(this, perScanLimit, null, 
parallelIteratorFactory, scanGrouper, scan);
+            iterators = new SerialIterators(this, perScanLimit, null, 
parallelIteratorFactory, scanGrouper, scan, caches);
         } else {
-            iterators = new ParallelIterators(this, perScanLimit, 
parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly);
+            iterators = new ParallelIterators(this, perScanLimit, 
parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches);
         }
         estimatedRows = iterators.getEstimatedRowCount();
         estimatedSize = iterators.getEstimatedByteCount();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 251908f..4b50ea9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -68,11 +68,13 @@ import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -82,11 +84,13 @@ import org.apache.phoenix.filter.DistinctPrefixFilter;
 import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
@@ -135,7 +139,6 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
        private static final Logger logger = 
LoggerFactory.getLogger(BaseResultIterators.class);
     private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
     private static final int MIN_SEEK_TO_COLUMN_VERSION = 
VersionUtil.encodeVersion("0", "98", "12");
-
     private final List<List<Scan>> scans;
     private final List<KeyRange> splits;
     private final byte[] physicalTableName;
@@ -150,6 +153,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
     private boolean hasGuidePosts;
     private Scan scan;
     private boolean useStatsForParallelization;
+    protected Map<ImmutableBytesPtr,ServerCache> caches;
     
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new 
Function<HRegionLocation, KeyRange>() {
         @Override
@@ -466,11 +470,12 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         }
     }
     
-    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset, ParallelScanGrouper scanGrouper, Scan scan, 
Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), 
plan.getOrderBy(),
                 plan.getStatement().getHint(), 
QueryUtil.getOffsetLimit(plan.getLimit(), plan.getOffset()), offset);
         this.plan = plan;
         this.scan = scan;
+        this.caches = caches;
         this.scanGrouper = scanGrouper;
         StatementContext context = plan.getContext();
         // Clone MutationState as the one on the connection will change if 
auto commit is on
@@ -846,7 +851,8 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         List<PeekingResultIterator> iterators = new 
ArrayList<PeekingResultIterator>(numScans);
         ScanWrapper previousScan = new ScanWrapper(null);
         return getIterators(scans, services, isLocalIndex, allIterators, 
iterators, isReverse, maxQueryEndTime,
-                splits.size(), previousScan);
+                splits.size(), previousScan, 
context.getConnection().getQueryServices().getConfiguration()
+                        .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, 
QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES));
     }
 
     class ScanWrapper {
@@ -868,11 +874,12 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
 
     private List<PeekingResultIterator> getIterators(List<List<Scan>> scan, 
ConnectionQueryServices services,
             boolean isLocalIndex, Queue<PeekingResultIterator> allIterators, 
List<PeekingResultIterator> iterators,
-            boolean isReverse, long maxQueryEndTime, int splitSize, 
ScanWrapper previousScan) throws SQLException {
+            boolean isReverse, long maxQueryEndTime, int splitSize, 
ScanWrapper previousScan, int retryCount) throws SQLException {
         boolean success = false;
         final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = 
Lists.newArrayListWithExpectedSize(splitSize);
         allFutures.add(futures);
         SQLException toThrow = null;
+        final HashCacheClient hashCacheClient = new 
HashCacheClient(context.getConnection());
         int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
         try {
             submitWork(scan, futures, allIterators, splitSize, isReverse, 
scanGrouper);
@@ -902,7 +909,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                     } catch (ExecutionException e) {
                         try { // Rethrow as SQLException
                             throw ServerUtil.parseServerException(e);
-                        } catch (StaleRegionBoundaryCacheException e2) {
+                        } catch (StaleRegionBoundaryCacheException | 
HashJoinCacheNotFoundException e2){
                            /*
                             * Note that a StaleRegionBoundaryCacheException 
could be thrown in multiple scenarios including splits, region
                             * moves, table disabled, etc. See 
ServerUtil.parseServerException() for details. 
@@ -917,23 +924,35 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                                     throw new 
TableNotEnabledException(physicalTableName);
                                 }
                             }
-                            scanPairItr.remove();
                             // Catch only to try to recover from region 
boundary cache being out of date
                             if (!clearedCache) { // Clear cache once so that 
we rejigger job based on new boundaries
                                 
services.clearTableRegionCache(physicalTableName);
                                 
context.getOverallQueryMetrics().cacheRefreshedDueToSplits();
                             }
+                            // Resubmit just this portion of work again
+                            Scan oldScan = scanPair.getFirst();
+                            byte[] startKey = 
oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
+                            if(e2 instanceof HashJoinCacheNotFoundException){
+                                logger.debug(
+                                        "Retrying when Hash Join cache is not 
found on the server ,by sending the cache again");
+                                if(retryCount<=0){
+                                    throw e2;
+                                }
+                                Long cacheId = 
((HashJoinCacheNotFoundException)e2).getCacheId();
+                                if 
(!hashCacheClient.addHashCacheToServer(startKey,
+                                        caches.get(new 
ImmutableBytesPtr(Bytes.toBytes(cacheId))), plan.getTableRef().getTable())) { 
throw e2; }
+                            }
                             concatIterators =
                                     recreateIterators(services, isLocalIndex, 
allIterators,
                                         iterators, isReverse, maxQueryEndTime, 
previousScan,
-                                        concatIterators, scanPair);
+                                        clearedCache, 
concatIterators,scanPairItr, scanPair, retryCount-1);
                         } catch(ColumnFamilyNotFoundException cfnfe) {
                             if 
(scanPair.getFirst().getAttribute(LOCAL_INDEX_BUILD) != null) {
                                 Thread.sleep(1000);
                                 concatIterators =
                                         recreateIterators(services, 
isLocalIndex, allIterators,
                                             iterators, isReverse, 
maxQueryEndTime, previousScan,
-                                            clearedCache, concatIterators, 
scanPairItr, scanPair);
+                                            clearedCache, concatIterators, 
scanPairItr, scanPair, retryCount);
                             }
                         }
                     }
@@ -991,7 +1010,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             boolean isLocalIndex, Queue<PeekingResultIterator> allIterators,
             List<PeekingResultIterator> iterators, boolean isReverse, long 
maxQueryEndTime,
             ScanWrapper previousScan, List<PeekingResultIterator> 
concatIterators,
-            Pair<Scan, Future<PeekingResultIterator>> scanPair) throws 
SQLException {
+            Pair<Scan, Future<PeekingResultIterator>> scanPair, int 
retryCount) throws SQLException {
         // Resubmit just this portion of work again
         Scan oldScan = scanPair.getFirst();
         byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
@@ -1003,7 +1022,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         addIterator(iterators, concatIterators);
         concatIterators = Lists.newArrayList();
         getIterators(newNestedScans, services, isLocalIndex, allIterators, 
iterators, isReverse,
-                maxQueryEndTime, newNestedScans.size(), previousScan);
+                maxQueryEndTime, newNestedScans.size(), previousScan, 
retryCount);
         return concatIterators;
     }
 
@@ -1013,24 +1032,25 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             ScanWrapper previousScan, boolean clearedCache,
             List<PeekingResultIterator> concatIterators,
             Iterator<Pair<Scan, Future<PeekingResultIterator>>> scanPairItr,
-            Pair<Scan, Future<PeekingResultIterator>> scanPair) throws 
SQLException {
+            Pair<Scan, Future<PeekingResultIterator>> scanPair, int 
retryCount) throws SQLException {
         scanPairItr.remove();
         concatIterators =
                 recreateIterators(services, isLocalIndex, allIterators, 
iterators, isReverse,
-                    maxQueryEndTime, previousScan, concatIterators, scanPair);
+                    maxQueryEndTime, previousScan, concatIterators, scanPair, 
retryCount);
         return concatIterators;
     }
     
 
     @Override
     public void close() throws SQLException {
-        if (allFutures.isEmpty()) {
-            return;
-        }
+        
         // Don't call cancel on already started work, as it causes the 
HConnection
         // to get into a funk. Instead, just cancel queued work.
         boolean cancelledWork = false;
         try {
+            if (allFutures.isEmpty()) {
+                return;
+            }
             List<Future<PeekingResultIterator>> futuresToClose = 
Lists.newArrayListWithExpectedSize(getSplits().size());
             for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures 
: allFutures) {
                 for (List<Pair<Scan,Future<PeekingResultIterator>>> 
futureScans : futures) {
@@ -1065,6 +1085,8 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                 }
             }
         } finally {
+            SQLCloseables.closeAllQuietly(caches.values());
+            caches.clear();
             if (cancelledWork) {
                 
context.getConnection().getQueryServices().getExecutor().purge();
             }
@@ -1174,4 +1196,4 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         return "ResultIterators [name=" + getName() + ",id=" + scanId + 
",scans=" + scans + "]";
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
index b720b56..5d52625 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
@@ -18,10 +18,13 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.TableRef;
 
@@ -29,8 +32,8 @@ public class DefaultTableResultIteratorFactory implements 
TableResultIteratorFac
 
     @Override
     public TableResultIterator newIterator(MutationState mutationState, 
TableRef tableRef, Scan scan,
-            CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan 
plan, ParallelScanGrouper scanGrouper) throws SQLException {
-        return new TableResultIterator(mutationState, scan, scanMetrics, 
renewLeaseThreshold, plan, scanGrouper);
+            CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan 
plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> 
caches) throws SQLException {
+        return new TableResultIterator(mutationState, scan, scanMetrics, 
renewLeaseThreshold, plan, scanGrouper, caches);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 8c9b689..f5d226f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -22,13 +22,16 @@ import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARAL
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.monitoring.MetricType;
@@ -55,16 +58,16 @@ public class ParallelIterators extends BaseResultIterators {
     private final ParallelIteratorFactory iteratorFactory;
     private final boolean initFirstScanOnly;
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan 
scan, boolean initFirstScanOnly)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan 
scan, boolean initFirstScanOnly, Map<ImmutableBytesPtr,ServerCache> caches)
             throws SQLException {
-        super(plan, perScanLimit, null, scanGrouper, scan);
+        super(plan, perScanLimit, null, scanGrouper, scan,caches);
         this.iteratorFactory = iteratorFactory;
         this.initFirstScanOnly = initFirstScanOnly;
     }   
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, Scan scan, boolean 
initOneScanPerRegion)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, Scan scan, boolean 
initOneScanPerRegion, Map<ImmutableBytesPtr,ServerCache> caches)
             throws SQLException {
-        this(plan, perScanLimit, iteratorFactory, 
DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion);
+        this(plan, perScanLimit, iteratorFactory, 
DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches);
     }  
 
     @Override
@@ -101,7 +104,7 @@ public class ParallelIterators extends BaseResultIterators {
             final Scan scan = scanLocation.getScan();
             final CombinableMetric scanMetrics = 
readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
             final TaskExecutionMetricsHolder taskMetrics = new 
TaskExecutionMetricsHolder(readMetrics, physicalTableName);
-            final TableResultIterator tableResultItr = 
context.getConnection().getTableResultIteratorFactory().newIterator(mutationState,
 tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
+            final TableResultIterator tableResultItr = 
context.getConnection().getTableResultIteratorFactory().newIterator(mutationState,
 tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper, caches);
             context.getConnection().addIteratorForLeaseRenewal(tableResultItr);
             Future<PeekingResultIterator> future = 
executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index d8f7f40..b13749c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -22,6 +22,7 @@ import static 
org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -29,8 +30,10 @@ import java.util.concurrent.Future;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
@@ -59,9 +62,9 @@ public class SerialIterators extends BaseResultIterators {
     private final Integer offset;
     
     public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset,
-            ParallelIteratorFactory iteratorFactory, ParallelScanGrouper 
scanGrouper, Scan scan)
+            ParallelIteratorFactory iteratorFactory, ParallelScanGrouper 
scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches)
             throws SQLException {
-        super(plan, perScanLimit, offset, scanGrouper, scan);
+        super(plan, perScanLimit, offset, scanGrouper, scan, caches);
         this.offset = offset;
         // must be a offset or a limit specified or a SERIAL hint
         Preconditions.checkArgument(
@@ -90,7 +93,7 @@ public class SerialIterators extends BaseResultIterators {
             Future<PeekingResultIterator> future = 
executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 @Override
                 public PeekingResultIterator call() throws Exception {
-                    PeekingResultIterator itr = new SerialIterator(finalScans, 
tableName, renewLeaseThreshold, offset);
+                    PeekingResultIterator itr = new SerialIterator(finalScans, 
tableName, renewLeaseThreshold, offset, caches);
                     return itr;
                 }
 
@@ -140,13 +143,15 @@ public class SerialIterators extends BaseResultIterators {
         private int index;
         private PeekingResultIterator currentIterator;
         private Integer remainingOffset;
+        private Map<ImmutableBytesPtr,ServerCache> caches;
         
-        private SerialIterator(List<Scan> flattenedScans, String tableName, 
long renewLeaseThreshold, Integer offset) throws SQLException {
+        private SerialIterator(List<Scan> flattenedScans, String tableName, 
long renewLeaseThreshold, Integer offset, Map<ImmutableBytesPtr,ServerCache> 
caches) throws SQLException {
             this.scans = 
Lists.newArrayListWithExpectedSize(flattenedScans.size());
             this.tableName = tableName;
             this.renewLeaseThreshold = renewLeaseThreshold;
             this.scans.addAll(flattenedScans);
             this.remainingOffset = offset;
+            this.caches = caches;
             if (this.remainingOffset != null) {
                 // mark the last scan for offset purposes
                 this.scans.get(this.scans.size() - 
1).setAttribute(QueryConstants.LAST_SCAN, Bytes.toBytes(Boolean.TRUE));
@@ -173,7 +178,7 @@ public class SerialIterators extends BaseResultIterators {
                 if (remainingOffset != null) {
                     
currentScan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, 
PInteger.INSTANCE.toBytes(remainingOffset));
                 }
-                TableResultIterator itr = new 
TableResultIterator(mutationState, currentScan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
renewLeaseThreshold, plan, scanGrouper);
+                TableResultIterator itr = new 
TableResultIterator(mutationState, currentScan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
renewLeaseThreshold, plan, scanGrouper, caches);
                 PeekingResultIterator peekingItr = 
iteratorFactory.newIterator(context, itr, currentScan, tableName, plan);
                 Tuple tuple;
                 if ((tuple = peekingItr.peek()) == null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 39554d3..5a3b77b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -30,6 +30,7 @@ import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UN
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -39,9 +40,16 @@ import org.apache.hadoop.hbase.client.AbstractClientScanner;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
+import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -50,6 +58,8 @@ import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -72,9 +82,9 @@ public class TableResultIterator implements ResultIterator {
     private final long renewLeaseThreshold;
     private final QueryPlan plan;
     private final ParallelScanGrouper scanGrouper;
+    private static final Logger logger = 
LoggerFactory.getLogger(TableResultIterator.class);
     private Tuple lastTuple = null;
     private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-
     @GuardedBy("renewLeaseLock")
     private ResultIterator scanIterator;
 
@@ -86,6 +96,10 @@ public class TableResultIterator implements ResultIterator {
     
     private final Lock renewLeaseLock = new ReentrantLock();
 
+    private int retry;
+    private Map<ImmutableBytesPtr,ServerCache> caches;
+    private HashCacheClient hashCacheClient;
+
     @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
     TableResultIterator() {
         this.scanMetrics = null;
@@ -94,6 +108,8 @@ public class TableResultIterator implements ResultIterator {
         this.scan = null;
         this.plan = null;
         this.scanGrouper = null;
+        this.caches = null;
+        this.retry = 0;
     }
 
     public static enum RenewLeaseStatus {
@@ -102,6 +118,11 @@ public class TableResultIterator implements ResultIterator 
{
 
     public TableResultIterator(MutationState mutationState, Scan scan, 
CombinableMetric scanMetrics,
             long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper 
scanGrouper) throws SQLException {
+        this(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, 
scanGrouper, null);
+    }
+
+    public TableResultIterator(MutationState mutationState, Scan scan, 
CombinableMetric scanMetrics,
+            long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper 
scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
         this.scan = scan;
         this.scanMetrics = scanMetrics;
         this.plan = plan;
@@ -110,6 +131,10 @@ public class TableResultIterator implements ResultIterator 
{
         this.scanIterator = UNINITIALIZED_SCANNER;
         this.renewLeaseThreshold = renewLeaseThreshold;
         this.scanGrouper = scanGrouper;
+        this.hashCacheClient = new 
HashCacheClient(plan.getContext().getConnection());
+        this.caches = caches;
+        
this.retry=plan.getContext().getConnection().getQueryServices().getProps()
+        .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, 
QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
     }
 
     @Override
@@ -147,7 +172,7 @@ public class TableResultIterator implements ResultIterator {
             } catch (SQLException e) {
                 try {
                     throw ServerUtil.parseServerException(e);
-                } catch(StaleRegionBoundaryCacheException e1) {
+                } catch(StaleRegionBoundaryCacheException | 
HashJoinCacheNotFoundException e1) {
                     if(ScanUtil.isNonAggregateScan(scan)) {
                         // For non aggregate queries if we get stale region 
boundary exception we can
                         // continue scanning from the next value of lasted 
fetched result.
@@ -165,8 +190,28 @@ public class TableResultIterator implements ResultIterator 
{
                             }
                         }
                         
plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
-                        this.scanIterator =
-                                plan.iterator(scanGrouper, newScan);
+                                               if (e1 instanceof 
HashJoinCacheNotFoundException) {
+                                                       logger.debug(
+                                                                       
"Retrying when Hash Join cache is not found on the server ,by sending the cache 
again");
+                                                       if (retry <= 0) {
+                                                               throw e1;
+                                                       }
+                                                       retry--;
+                                                       try {
+                                                               Long cacheId = 
((HashJoinCacheNotFoundException) e1).getCacheId();
+                                                               if 
(!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
+                                                                               
caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))),
+                                                                               
plan.getTableRef().getTable())) {
+                                                                       throw 
e1;
+                                                               }
+                                                               
this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, 
newScan);
+
+                                                       } catch (Exception e2) {
+                                                               throw new 
SQLException(e2);
+                                                       }
+                                               } else {
+                                                       this.scanIterator = 
plan.iterator(scanGrouper, newScan);
+                                               }
                         lastTuple = scanIterator.next();
                     } else {
                         throw e;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
index 8d7b54d..df451b1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
@@ -18,13 +18,16 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.TableRef;
 
 public interface TableResultIteratorFactory {
-    public TableResultIterator newIterator(MutationState mutationState, 
TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long 
renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws 
SQLException;        
+    public TableResultIterator newIterator(MutationState mutationState, 
TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long 
renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, 
Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException;        
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index 32d0469..2ec509c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
@@ -56,6 +57,7 @@ import com.google.common.collect.Lists;
  */
 public class HashCacheClient  {
     private final ServerCacheClient serverCache;
+
     /**
      * Construct client used to create a serialized cached snapshot of a table 
and send it to each region server
      * for caching during hash join processing.
@@ -81,7 +83,22 @@ public class HashCacheClient  {
          */
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         serialize(ptr, iterator, estimatedSize, onExpressions, 
singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
-        return serverCache.addServerCache(keyRanges, ptr, 
ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef);
+        ServerCache cache = serverCache.addServerCache(keyRanges, ptr, 
ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef, true);
+        return cache;
+    }
+    
+    /**
+     * Should only be used to resend the hash table cache to the regionserver.
+     *  
+     * @param startkeyOfRegion start key of any region hosted on a 
regionserver which needs hash cache
+     * @param cacheId Id of the cache which needs to be sent
+     * @param pTable
+     * @return
+     * @throws Exception
+     */
+    public boolean addHashCacheToServer(byte[] startkeyOfRegion, ServerCache 
cache, PTable pTable) throws Exception{
+        if (cache == null) { return false; }
+        return serverCache.addServerCache(startkeyOfRegion, cache, new 
HashCacheFactory(), ByteUtil.EMPTY_BYTE_ARRAY, pTable);
     }
     
     private void serialize(ImmutableBytesWritable ptr, ResultIterator 
iterator, long estimatedSize, List<Expression> onExpressions, boolean 
singleValueOnly, Expression keyRangeRhsExpression, List<Expression> 
keyRangeRhsValues) throws SQLException {
@@ -179,4 +196,5 @@ public class HashCacheClient  {
         // might be coerced later.
         return new RowValueConstructorExpression(values, false);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index e7487a1..7607388 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -399,4 +399,6 @@ public interface QueryConstants {
     public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
     public static final String LAST_SCAN = "LAST_SCAN";
     public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
+    public static final String HASH_JOIN_CACHE_RETRIES = 
"hashjoin.client.retries.number";
+    public static final int DEFAULT_HASH_JOIN_CACHE_RETRIES = 5;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
index 44502a5..d11f3a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.List;
 
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -577,4 +578,18 @@ public class ByteUtil {
             throw new IllegalArgumentException("Unknown operator " + op);
         }
     }
+    
+    public static boolean contains(List<byte[]> keys, byte[] key) {
+        for (byte[] k : keys) {
+            if (Arrays.equals(k, key)) { return true; }
+        }
+        return false;
+    }
+
+    public static boolean contains(List<ImmutableBytesPtr> keys, 
ImmutableBytesPtr key) {
+        for (ImmutableBytesPtr k : keys) {
+            if (key.compareTo(k) == 0) { return true; }
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index fe0937b..67b43a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -45,6 +46,7 @@ public class ServerUtil {
     
     private static final String FORMAT = "ERROR %d (%s): %s";
     private static final Pattern PATTERN = Pattern.compile("ERROR (\\d+) 
\\((\\w+)\\): (.*)");
+    private static final Pattern HASH_JOIN_EXCEPTION_PATTERN = 
Pattern.compile("joinId: (-?\\d+)");
     private static final Pattern PATTERN_FOR_TS = 
Pattern.compile(",serverTimestamp=(\\d+),");
     private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,";
     private static final Map<Class<? extends Exception>, SQLExceptionCode> 
errorcodeMap
@@ -132,6 +134,7 @@ public class ServerUtil {
     }
 
     private static SQLException parseRemoteException(Throwable t) {
+        
         String message = t.getLocalizedMessage();
         if (message != null) {
             // If the message matches the standard pattern, recover the 
SQLException and throw it.
@@ -139,6 +142,10 @@ public class ServerUtil {
             if (matcher.find()) {
                 int statusCode = Integer.parseInt(matcher.group(1));
                 SQLExceptionCode code = 
SQLExceptionCode.fromErrorCode(statusCode);
+                if(code.equals(SQLExceptionCode.HASH_JOIN_CACHE_NOT_FOUND)){
+                    Matcher m = 
HASH_JOIN_EXCEPTION_PATTERN.matcher(t.getLocalizedMessage());
+                    if (m.find()) { return new 
HashJoinCacheNotFoundException(Long.parseLong(m.group(1))); }
+                }
                 return new 
SQLExceptionInfo.Builder(code).setMessage(matcher.group()).setRootCause(t).build().buildException();
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index dc6f4c9..556f195 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -620,6 +620,7 @@ public abstract class BaseTest {
         conf.setInt("hbase.assignment.zkevent.workers", 5);
         conf.setInt("hbase.assignment.threads.max", 5);
         conf.setInt("hbase.catalogjanitor.interval", 5000);
+        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
         conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
         return conf;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index a0696c0..3980bc6 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -482,7 +482,7 @@ public class ParallelIteratorsSplitTest extends 
BaseConnectionlessQueryTest {
                 return null;
             }
             
-        }, null, new 
SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()),
 context.getScan(), false);
+        }, null, new 
SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()),
 context.getScan(), false, null);
         List<KeyRange> keyRanges = parallelIterators.getSplits();
         return keyRanges;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 6afc796..d69d3ff 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -937,5 +938,33 @@ public class TestUtil {
         assertTrue(rs.next());
         return rs.getLong(1);
     }
+    
+    public static void addCoprocessor(Connection conn, String tableName, Class 
coprocessorClass) throws Exception {
+        int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
+        ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HTableDescriptor descriptor = 
services.getTableDescriptor(Bytes.toBytes(tableName));
+               if 
(!descriptor.getCoprocessors().contains(coprocessorClass.getName())) {
+                       descriptor.addCoprocessor(coprocessorClass.getName(), 
null, priority, null);
+               }else{
+                       return;
+               }
+        int numTries = 10;
+        try (HBaseAdmin admin = services.getAdmin()) {
+            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+            admin.disableTable(tableName);
+            admin.enableTable(tableName);
+            while 
(!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
+                    && numTries > 0) {
+                numTries--;
+                if (numTries == 0) {
+                    throw new Exception(
+                            "Check to detect if delaying co-processor was 
added failed after "
+                                    + numTries + " retries.");
+                }
+                Thread.sleep(1000);
+            }
+        }
+    }
+
 
 }

Reply via email to