Repository: phoenix
Updated Branches:
  refs/heads/4.8-HBase-1.1 56eb4bd28 -> be90ecf24


PHOENIX-3159 CachingHTableFactory may close HTable during eviction even if it 
is getting used for writing by another thread


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/be90ecf2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/be90ecf2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/be90ecf2

Branch: refs/heads/4.8-HBase-1.1
Commit: be90ecf24db1064133fbe726e0d508a629d5d74d
Parents: 56eb4bd
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Mon Oct 3 19:14:47 2016 +0530
Committer: Ankit Singhal <ankitsingha...@gmail.com>
Committed: Mon Oct 3 19:14:47 2016 +0530

----------------------------------------------------------------------
 .../hbase/index/table/CachingHTableFactory.java | 104 ++++++++++++++++---
 .../index/table/CoprocessorHTableFactory.java   |   6 ++
 .../hbase/index/table/HTableFactory.java        |   4 +-
 .../hbase/index/write/IndexWriterUtils.java     |   3 +
 .../write/ParallelWriterIndexCommitter.java     |  21 ++--
 .../TrackingParallelWriterIndexCommitter.java   |  18 ++--
 .../hbase/index/write/FakeTableFactory.java     |   9 +-
 .../index/write/TestCachingHTableFactory.java   |  37 ++++---
 .../hbase/index/write/TestIndexWriter.java      |  24 +++--
 .../index/write/TestParalleIndexWriter.java     |  16 ++-
 .../write/TestParalleWriterIndexCommitter.java  |  15 ++-
 11 files changed, 197 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
index 0c06e2b..d0df5b3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
@@ -17,18 +17,30 @@
  */
 package org.apache.phoenix.hbase.index.table;
 
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.HTABLE_KEEP_ALIVE_KEY;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY;
+
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.collections.map.LRUMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
-
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.execute.DelegateHTable;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
+import com.google.common.annotations.VisibleForTesting;;
+
 /**
  * A simple cache that just uses usual GC mechanisms to cleanup unused {@link 
HTableInterface}s.
  * When requesting an {@link HTableInterface} via {@link #getTable}, you may 
get the same table as
@@ -47,7 +59,7 @@ public class CachingHTableFactory implements HTableFactory {
   public class HTableInterfaceLRUMap extends LRUMap {
 
     public HTableInterfaceLRUMap(int cacheSize) {
-      super(cacheSize);
+      super(cacheSize, true);
     }
 
     @Override
@@ -58,12 +70,18 @@ public class CachingHTableFactory implements HTableFactory {
             + " because it was evicted from the cache.");
       }
       try {
-        table.close();
+         synchronized (this) { // the whole operation of closing and checking 
the count should be atomic
+                                    // and should not conflict with getTable()
+          if (((CachedHTableWrapper)table).getReferenceCount() <= 0) {
+            table.close();
+            return true;
+          }
+        }
       } catch (IOException e) {
         LOG.info("Failed to correctly close HTable: " + 
Bytes.toString(table.getTableName())
             + " ignoring since being removed from queue.");
       }
-      return true;
+      return false;
     }
   }
 
@@ -73,38 +91,94 @@ public class CachingHTableFactory implements HTableFactory {
 
   private static final Log LOG = LogFactory.getLog(CachingHTableFactory.class);
   private static final String CACHE_SIZE_KEY = "index.tablefactory.cache.size";
-  private static final int DEFAULT_CACHE_SIZE = 10;
+  private static final int DEFAULT_CACHE_SIZE = 1000;
 
   private HTableFactory delegate;
 
   @SuppressWarnings("rawtypes")
   Map openTables;
+  private ThreadPoolExecutor pool;
 
-  public CachingHTableFactory(HTableFactory tableFactory, Configuration conf) {
-    this(tableFactory, getCacheSize(conf));
+  public CachingHTableFactory(HTableFactory tableFactory, Configuration conf, 
RegionCoprocessorEnvironment env) {
+    this(tableFactory, getCacheSize(conf), env);
   }
 
-  public CachingHTableFactory(HTableFactory factory, int cacheSize) {
+  public CachingHTableFactory(HTableFactory factory, int cacheSize, 
RegionCoprocessorEnvironment env) {
     this.delegate = factory;
     openTables = new HTableInterfaceLRUMap(cacheSize);
+        this.pool = new ThreadPoolExecutor(1,
+                
env.getConfiguration().getInt(INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY, 
Integer.MAX_VALUE),
+                env.getConfiguration().getInt(HTABLE_KEEP_ALIVE_KEY, 60), 
TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>(), 
Threads.newDaemonThreadFactory("CachedHtables"));
+        pool.allowCoreThreadTimeOut(true);
   }
-
+  
   @Override
   @SuppressWarnings("unchecked")
-  public HTableInterface getTable(ImmutableBytesPtr tablename) throws 
IOException {
+  public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService 
pool) throws IOException {
     ImmutableBytesPtr tableBytes = new ImmutableBytesPtr(tablename);
     synchronized (openTables) {
-      HTableInterface table = (HTableInterface) openTables.get(tableBytes);
+      CachedHTableWrapper table = (CachedHTableWrapper) 
openTables.get(tableBytes);
       if (table == null) {
-        table = delegate.getTable(tablename);
+        table = new CachedHTableWrapper(delegate.getTable(tablename, pool));
         openTables.put(tableBytes, table);
       }
+      table.incrementReferenceCount();
       return table;
     }
   }
 
   @Override
-  public void shutdown() {
-    this.delegate.shutdown();
-  }
+    public void shutdown() {
+        this.delegate.shutdown();
+        this.pool.shutdown();
+        try {
+            boolean terminated = false;
+            do {
+                // wait until the pool has terminated
+                terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
+            } while (!terminated);
+        } catch (InterruptedException e) {
+            this.pool.shutdownNow();
+            LOG.warn("waitForTermination interrupted");
+        }
+    }
+
+    public static class CachedHTableWrapper extends DelegateHTable {
+
+        private AtomicInteger referenceCount = new AtomicInteger();
+
+        public CachedHTableWrapper(HTableInterface table) {
+            super(table);
+        }
+
+        @Override
+        public synchronized void close() throws IOException {
+            if (getReferenceCount() > 0) {
+                this.referenceCount.decrementAndGet();
+            } else {
+                // During LRU eviction
+                super.close();
+            }
+        }
+
+        public void incrementReferenceCount() {
+            this.referenceCount.incrementAndGet();
+        }
+
+        public int getReferenceCount() {
+            return this.referenceCount.get();
+        }
+
+    }
+
+    @Override
+    public HTableInterface getTable(ImmutableBytesPtr tablename) throws 
IOException {
+        return getTable(tablename, this.pool);
+    }
+    
+    @VisibleForTesting
+    public ThreadPoolExecutor getPool(){
+        return this.pool;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
index ded618d..45e271d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.hbase.index.table;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.TableName;
@@ -36,6 +37,11 @@ public class CoprocessorHTableFactory implements 
HTableFactory {
     public HTableInterface getTable(ImmutableBytesPtr tablename) throws 
IOException {
         return 
this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()));
     }
+    
+    @Override
+    public HTableInterface getTable(ImmutableBytesPtr 
tablename,ExecutorService pool) throws IOException {
+        return 
this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()), pool);
+    }
 
     @Override
     public void shutdown() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
index bef3d34..e6a2e60 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
@@ -19,9 +19,9 @@
 package org.apache.phoenix.hbase.index.table;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.hbase.client.HTableInterface;
-
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 public interface HTableFactory {
@@ -29,4 +29,6 @@ public interface HTableFactory {
   public HTableInterface getTable(ImmutableBytesPtr tablename) throws 
IOException;
 
   public void shutdown();
+
+  public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService 
pool) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
index b8b0079..6eb657d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
@@ -54,6 +54,9 @@ public class IndexWriterUtils {
 
   /** Configuration key that HBase uses to set the max number of threads for 
an HTable */
   public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max";
+   public static final String INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY = 
"phoenix.index.writes.threads.max";
+   public static final String HTABLE_KEEP_ALIVE_KEY = 
"hbase.htable.threads.keepalivetime";
+   
   private IndexWriterUtils() {
     // private ctor for utilites
   }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index 0e6db58..1ab7338 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -21,14 +21,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
 import 
org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
 import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
 import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
@@ -41,7 +37,6 @@ import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
 
 import com.google.common.collect.Multimap;
 
@@ -59,7 +54,7 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
 
     public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = 
"index.writer.threads.max";
     private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
-    private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = 
"index.writer.threads.keepalivetime";
+    public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = 
"index.writer.threads.keepalivetime";
     private static final Log LOG = 
LogFactory.getLog(ParallelWriterIndexCommitter.class);
 
     private HTableFactory factory;
@@ -84,7 +79,7 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
                         new ThreadPoolBuilder(name, 
conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
                                 
DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
                                 INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), 
env.getRegionServerServices(), parent,
-                CachingHTableFactory.getCacheSize(conf));
+                CachingHTableFactory.getCacheSize(conf),env);
         this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
     }
 
@@ -93,8 +88,8 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
      * <p>
      * Exposed for TESTING
      */
-    void setup(HTableFactory factory, ExecutorService pool, Abortable 
abortable, Stoppable stop, int cacheSize) {
-        this.factory = new CachingHTableFactory(factory, cacheSize);
+    void setup(HTableFactory factory, ExecutorService pool, Abortable 
abortable, Stoppable stop, int cacheSize, RegionCoprocessorEnvironment env) {
+        this.factory = new CachingHTableFactory(factory, cacheSize, env);
         this.pool = new QuickFailingTaskRunner(pool);
         this.stopped = stop;
     }
@@ -151,6 +146,7 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Writing index update:" + mutations + " to 
table: " + tableReference);
                     }
+                    HTableInterface table = null;
                     try {
                                                if (allowLocalUpdates && 
env!=null) {
                                try {
@@ -165,7 +161,7 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
                                    }
                                }
                                                }
-                        HTableInterface table = 
factory.getTable(tableReference.get());
+                        table = factory.getTable(tableReference.get());
                         throwFailureIfDone();
                         int i = 0;
                         table.batch(mutations);
@@ -178,6 +174,11 @@ public class ParallelWriterIndexCommitter implements 
IndexCommitter {
                         Thread.currentThread().interrupt();
                         throw new 
SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
                     }
+                    finally{
+                        if (table != null) {
+                            table.close();
+                        }
+                    }
                     return null;
                 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
index fc27f76..d244d66 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -23,13 +23,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.phoenix.hbase.index.CapturingAbortable;
 import 
org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
 import 
org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
@@ -48,7 +45,6 @@ import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
 
 import com.google.common.collect.Multimap;
 
@@ -95,7 +91,7 @@ public class TrackingParallelWriterIndexCommitter implements 
IndexCommitter {
                         new ThreadPoolBuilder(name, 
conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
                                 
DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
                                 INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), 
env.getRegionServerServices(), parent,
-                CachingHTableFactory.getCacheSize(conf));
+                CachingHTableFactory.getCacheSize(conf), env);
     }
 
     /**
@@ -103,9 +99,10 @@ public class TrackingParallelWriterIndexCommitter 
implements IndexCommitter {
      * <p>
      * Exposed for TESTING
      */
-    void setup(HTableFactory factory, ExecutorService pool, Abortable 
abortable, Stoppable stop, int cacheSize) {
+    void setup(HTableFactory factory, ExecutorService pool, Abortable 
abortable, Stoppable stop, int cacheSize,
+            RegionCoprocessorEnvironment env) {
         this.pool = new WaitForCompletionTaskRunner(pool);
-        this.factory = new CachingHTableFactory(factory, cacheSize);
+        this.factory = new CachingHTableFactory(factory, cacheSize, env);
         this.abortable = new CapturingAbortable(abortable);
         this.stopped = stop;
     }
@@ -148,6 +145,7 @@ public class TrackingParallelWriterIndexCommitter 
implements IndexCommitter {
                 @SuppressWarnings("deprecation")
                 @Override
                 public Boolean call() throws Exception {
+                    HTableInterface table = null;
                     try {
                         // this may have been queued, but there was an 
abort/stop so we try to early exit
                         throwFailureIfDone();
@@ -170,7 +168,7 @@ public class TrackingParallelWriterIndexCommitter 
implements IndexCommitter {
                             LOG.debug("Writing index update:" + mutations + " 
to table: " + tableReference);
                         }
 
-                        HTableInterface table = 
factory.getTable(tableReference.get());
+                        table = factory.getTable(tableReference.get());
                         throwFailureIfDone();
                         table.batch(mutations);
                     } catch (InterruptedException e) {
@@ -179,6 +177,10 @@ public class TrackingParallelWriterIndexCommitter 
implements IndexCommitter {
                         throw e;
                     } catch (Exception e) {
                         throw e;
+                    } finally {
+                        if (table != null) {
+                            table.close();
+                        }
                     }
                     return Boolean.TRUE;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
index 2b6be18..4483a7f 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
@@ -20,9 +20,9 @@ package org.apache.phoenix.hbase.index.write;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.hbase.client.HTableInterface;
-
 import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
@@ -41,11 +41,16 @@ class FakeTableFactory implements HTableFactory {
 
   @Override
   public HTableInterface getTable(ImmutableBytesPtr tablename) throws 
IOException {
-    return this.tables.get(tablename);
+    return getTable(tablename, null);
   }
 
   @Override
   public void shutdown() {
     shutdown = true;
   }
+
+    @Override
+    public HTableInterface getTable(ImmutableBytesPtr tablename, 
ExecutorService pool) throws IOException {
+        return this.tables.get(tablename);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
index adf82f3..93ac3a6 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
@@ -17,14 +17,18 @@
  */
 package org.apache.phoenix.hbase.index.write;
 
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.mockito.Mockito;
-
 import org.apache.phoenix.hbase.index.table.CachingHTableFactory;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestCachingHTableFactory {
 
@@ -32,27 +36,34 @@ public class TestCachingHTableFactory {
   public void testCacheCorrectlyExpiresTable() throws Exception {
     // setup the mocks for the tables we will request
     HTableFactory delegate = Mockito.mock(HTableFactory.class);
+    RegionCoprocessorEnvironment e 
=Mockito.mock(RegionCoprocessorEnvironment.class);
+    Configuration conf =new Configuration();
+    Mockito.when(e.getConfiguration()).thenReturn(conf);
+    Mockito.when(e.getSharedData()).thenReturn(new 
ConcurrentHashMap<String,Object>());
     ImmutableBytesPtr t1 = new ImmutableBytesPtr(Bytes.toBytes("t1"));
     ImmutableBytesPtr t2 = new ImmutableBytesPtr(Bytes.toBytes("t2"));
     ImmutableBytesPtr t3 = new ImmutableBytesPtr(Bytes.toBytes("t3"));
     HTableInterface table1 = Mockito.mock(HTableInterface.class);
     HTableInterface table2 = Mockito.mock(HTableInterface.class);
     HTableInterface table3 = Mockito.mock(HTableInterface.class);
-    Mockito.when(delegate.getTable(t1)).thenReturn(table1);
-    Mockito.when(delegate.getTable(t2)).thenReturn(table2);
-    Mockito.when(delegate.getTable(t3)).thenReturn(table3);
+    
     
     // setup our factory with a cache size of 2
-    CachingHTableFactory factory = new CachingHTableFactory(delegate, 2);
-    factory.getTable(t1);
-    factory.getTable(t2);
-    factory.getTable(t3);
+    CachingHTableFactory factory = new CachingHTableFactory(delegate, 2, e);
+    Mockito.when(delegate.getTable(t1,factory.getPool())).thenReturn(table1);
+    Mockito.when(delegate.getTable(t2,factory.getPool())).thenReturn(table2);
+    Mockito.when(delegate.getTable(t3,factory.getPool())).thenReturn(table3);
+    
+    HTableInterface ft1 =factory.getTable(t1);
+    HTableInterface ft2 =factory.getTable(t2);
+    ft1.close();
+    HTableInterface ft3 = factory.getTable(t3);
     // get the same table a second time, after it has gone out of cache
     factory.getTable(t1);
     
-    Mockito.verify(delegate, Mockito.times(2)).getTable(t1);
-    Mockito.verify(delegate, Mockito.times(1)).getTable(t2);
-    Mockito.verify(delegate, Mockito.times(1)).getTable(t3);
+    Mockito.verify(delegate, Mockito.times(2)).getTable(t1,factory.getPool());
+    Mockito.verify(delegate, Mockito.times(1)).getTable(t2,factory.getPool());
+    Mockito.verify(delegate, Mockito.times(1)).getTable(t3,factory.getPool());
     Mockito.verify(table1).close();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
index 8f576cf..76ea933 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -90,6 +91,10 @@ public class TestIndexWriter {
     LOG.info("Current thread is interrupted: " + Thread.interrupted());
     Abortable abort = new StubAbortable();
     Stoppable stop = Mockito.mock(Stoppable.class);
+    RegionCoprocessorEnvironment e 
=Mockito.mock(RegionCoprocessorEnvironment.class);
+    Configuration conf =new Configuration();
+    Mockito.when(e.getConfiguration()).thenReturn(conf);
+    Mockito.when(e.getSharedData()).thenReturn(new 
ConcurrentHashMap<String,Object>());
     ExecutorService exec = Executors.newFixedThreadPool(1);
     Map<ImmutableBytesPtr, HTableInterface> tables = new 
HashMap<ImmutableBytesPtr, HTableInterface>();
     FakeTableFactory factory = new FakeTableFactory(tables);
@@ -117,7 +122,7 @@ public class TestIndexWriter {
 
     // setup the writer and failure policy
     ParallelWriterIndexCommitter committer = new 
ParallelWriterIndexCommitter(VersionInfo.getVersion());
-    committer.setup(factory, exec, abort, stop, 2);
+    committer.setup(factory, exec, abort, stop, 2, e);
     KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
     policy.setup(stop, abort);
     IndexWriter writer = new IndexWriter(committer, policy);
@@ -164,7 +169,10 @@ public class TestIndexWriter {
     Mockito.when(table.batch(Mockito.anyList())).thenThrow(
       new IOException("Intentional IOException for failed first write."));
     Mockito.when(table.getTableName()).thenReturn(tableName);
-
+    RegionCoprocessorEnvironment e 
=Mockito.mock(RegionCoprocessorEnvironment.class);
+    Configuration conf =new Configuration();
+    Mockito.when(e.getConfiguration()).thenReturn(conf);
+    Mockito.when(e.getSharedData()).thenReturn(new 
ConcurrentHashMap<String,Object>());
     // second table just blocks to make sure that the abort propagates to the 
third task
     final CountDownLatch waitOnAbortedLatch = new CountDownLatch(1);
     final boolean[] failed = new boolean[] { false };
@@ -190,15 +198,15 @@ public class TestIndexWriter {
     tables.put(new ImmutableBytesPtr(tableName2), table2);
 
     ParallelWriterIndexCommitter committer = new 
ParallelWriterIndexCommitter(VersionInfo.getVersion());
-    committer.setup(factory, exec, abort, stop, 2);
+    committer.setup(factory, exec, abort, stop, 2, e);
     KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
     policy.setup(stop, abort);
     IndexWriter writer = new IndexWriter(committer, policy);
     try {
       writer.write(indexUpdates);
       fail("Should not have successfully completed all index writes");
-    } catch (SingleIndexWriteFailureException e) {
-      LOG.info("Correctly got a failure to reach the index", e);
+    } catch (SingleIndexWriteFailureException s) {
+      LOG.info("Correctly got a failure to reach the index", s);
       // should have correctly gotten the correct abort, so let the next task 
execute
       waitOnAbortedLatch.countDown();
     }
@@ -223,6 +231,10 @@ public class TestIndexWriter {
     // single thread factory so the older request gets queued
     ExecutorService exec = Executors.newFixedThreadPool(1);
     Map<ImmutableBytesPtr, HTableInterface> tables = new 
HashMap<ImmutableBytesPtr, HTableInterface>();
+    RegionCoprocessorEnvironment e 
=Mockito.mock(RegionCoprocessorEnvironment.class);
+    Configuration conf =new Configuration();
+    Mockito.when(e.getConfiguration()).thenReturn(conf);
+    Mockito.when(e.getSharedData()).thenReturn(new 
ConcurrentHashMap<String,Object>());
     FakeTableFactory factory = new FakeTableFactory(tables);
 
     byte[] tableName = this.testName.getTableName();
@@ -257,7 +269,7 @@ public class TestIndexWriter {
 
     // setup the writer
     ParallelWriterIndexCommitter committer = new 
ParallelWriterIndexCommitter(VersionInfo.getVersion());
-    committer.setup(factory, exec, abort, stop, 2);
+    committer.setup(factory, exec, abort, stop, 2, e );
     KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
     policy.setup(stop, abort);
     final IndexWriter writer = new IndexWriter(committer, policy);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
index 1f1e37e..ab88cd2 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
@@ -22,16 +22,19 @@ import static org.junit.Assert.assertTrue;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.phoenix.hbase.index.StubAbortable;
@@ -57,13 +60,17 @@ public class TestParalleIndexWriter {
   @Test
   public void testCorrectlyCleansUpResources() throws Exception{
     ExecutorService exec = Executors.newFixedThreadPool(1);
+    RegionCoprocessorEnvironment e 
=Mockito.mock(RegionCoprocessorEnvironment.class);
+    Configuration conf =new Configuration();
+    Mockito.when(e.getConfiguration()).thenReturn(conf);
+    Mockito.when(e.getSharedData()).thenReturn(new 
ConcurrentHashMap<String,Object>());
     FakeTableFactory factory = new FakeTableFactory(
         Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
     ParallelWriterIndexCommitter writer = new 
ParallelWriterIndexCommitter(VersionInfo.getVersion());
     Abortable mockAbort = Mockito.mock(Abortable.class);
     Stoppable mockStop = Mockito.mock(Stoppable.class);
     // create a simple writer
-    writer.setup(factory, exec, mockAbort, mockStop, 1);
+    writer.setup(factory, exec, mockAbort, mockStop, 1,e);
     // stop the writer
     writer.stop(this.test.getTableNameString() + " finished");
     assertTrue("Factory didn't get shutdown after writer#stop!", 
factory.shutdown);
@@ -82,7 +89,10 @@ public class TestParalleIndexWriter {
     Map<ImmutableBytesPtr, HTableInterface> tables =
         new HashMap<ImmutableBytesPtr, HTableInterface>();
     FakeTableFactory factory = new FakeTableFactory(tables);
-
+    RegionCoprocessorEnvironment e 
=Mockito.mock(RegionCoprocessorEnvironment.class);
+    Configuration conf =new Configuration();
+    Mockito.when(e.getConfiguration()).thenReturn(conf);
+    Mockito.when(e.getSharedData()).thenReturn(new 
ConcurrentHashMap<String,Object>());
     ImmutableBytesPtr tableName = new 
ImmutableBytesPtr(this.test.getTableName());
     Put m = new Put(row);
     m.add(Bytes.toBytes("family"), Bytes.toBytes("qual"), null);
@@ -107,7 +117,7 @@ public class TestParalleIndexWriter {
 
     // setup the writer and failure policy
     ParallelWriterIndexCommitter writer = new 
ParallelWriterIndexCommitter(VersionInfo.getVersion());
-    writer.setup(factory, exec, abort, stop, 1);
+    writer.setup(factory, exec, abort, stop, 1, e);
     writer.write(indexUpdates, true);
     assertTrue("Writer returned before the table batch completed! Likely a 
race condition tripped",
       completed[0]);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
index 8eece3b..219f615 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
@@ -22,16 +22,19 @@ import static org.junit.Assert.assertTrue;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.phoenix.hbase.index.StubAbortable;
@@ -62,8 +65,12 @@ public class TestParalleWriterIndexCommitter {
     ParallelWriterIndexCommitter writer = new 
ParallelWriterIndexCommitter(VersionInfo.getVersion());
     Abortable mockAbort = Mockito.mock(Abortable.class);
     Stoppable mockStop = Mockito.mock(Stoppable.class);
+    RegionCoprocessorEnvironment e 
=Mockito.mock(RegionCoprocessorEnvironment.class);
+    Configuration conf =new Configuration();
+    Mockito.when(e.getConfiguration()).thenReturn(conf);
+    Mockito.when(e.getSharedData()).thenReturn(new 
ConcurrentHashMap<String,Object>());
     // create a simple writer
-    writer.setup(factory, exec, mockAbort, mockStop, 1);
+    writer.setup(factory, exec, mockAbort, mockStop, 1, e);
     // stop the writer
     writer.stop(this.test.getTableNameString() + " finished");
     assertTrue("Factory didn't get shutdown after writer#stop!", 
factory.shutdown);
@@ -77,6 +84,10 @@ public class TestParalleWriterIndexCommitter {
     LOG.info("Starting " + test.getTableNameString());
     LOG.info("Current thread is interrupted: " + Thread.interrupted());
     Abortable abort = new StubAbortable();
+    RegionCoprocessorEnvironment e 
=Mockito.mock(RegionCoprocessorEnvironment.class);
+    Configuration conf =new Configuration();
+    Mockito.when(e.getConfiguration()).thenReturn(conf);
+    Mockito.when(e.getSharedData()).thenReturn(new 
ConcurrentHashMap<String,Object>());
     Stoppable stop = Mockito.mock(Stoppable.class);
     ExecutorService exec = Executors.newFixedThreadPool(1);
     Map<ImmutableBytesPtr, HTableInterface> tables =
@@ -107,7 +118,7 @@ public class TestParalleWriterIndexCommitter {
 
     // setup the writer and failure policy
     ParallelWriterIndexCommitter writer = new 
ParallelWriterIndexCommitter(VersionInfo.getVersion());
-    writer.setup(factory, exec, abort, stop, 1);
+    writer.setup(factory, exec, abort, stop, 1, e);
     writer.write(indexUpdates, true);
     assertTrue("Writer returned before the table batch completed! Likely a 
race condition tripped",
       completed[0]);

Reply via email to