Repository: phoenix Updated Branches: refs/heads/4.8-HBase-1.1 56eb4bd28 -> be90ecf24
PHOENIX-3159 CachingHTableFactory may close HTable during eviction even if it is getting used for writing by another thread Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/be90ecf2 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/be90ecf2 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/be90ecf2 Branch: refs/heads/4.8-HBase-1.1 Commit: be90ecf24db1064133fbe726e0d508a629d5d74d Parents: 56eb4bd Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Mon Oct 3 19:14:47 2016 +0530 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Mon Oct 3 19:14:47 2016 +0530 ---------------------------------------------------------------------- .../hbase/index/table/CachingHTableFactory.java | 104 ++++++++++++++++--- .../index/table/CoprocessorHTableFactory.java | 6 ++ .../hbase/index/table/HTableFactory.java | 4 +- .../hbase/index/write/IndexWriterUtils.java | 3 + .../write/ParallelWriterIndexCommitter.java | 21 ++-- .../TrackingParallelWriterIndexCommitter.java | 18 ++-- .../hbase/index/write/FakeTableFactory.java | 9 +- .../index/write/TestCachingHTableFactory.java | 37 ++++--- .../hbase/index/write/TestIndexWriter.java | 24 +++-- .../index/write/TestParalleIndexWriter.java | 16 ++- .../write/TestParalleWriterIndexCommitter.java | 15 ++- 11 files changed, 197 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java index 0c06e2b..d0df5b3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java @@ -17,18 +17,30 @@ */ package org.apache.phoenix.hbase.index.table; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.HTABLE_KEEP_ALIVE_KEY; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY; + import java.io.IOException; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.collections.map.LRUMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; - +import org.apache.hadoop.hbase.util.Threads; +import org.apache.phoenix.execute.DelegateHTable; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import com.google.common.annotations.VisibleForTesting;; + /** * A simple cache that just uses usual GC mechanisms to cleanup unused {@link HTableInterface}s. * When requesting an {@link HTableInterface} via {@link #getTable}, you may get the same table as @@ -47,7 +59,7 @@ public class CachingHTableFactory implements HTableFactory { public class HTableInterfaceLRUMap extends LRUMap { public HTableInterfaceLRUMap(int cacheSize) { - super(cacheSize); + super(cacheSize, true); } @Override @@ -58,12 +70,18 @@ public class CachingHTableFactory implements HTableFactory { + " because it was evicted from the cache."); } try { - table.close(); + synchronized (this) { // the whole operation of closing and checking the count should be atomic + // and should not conflict with getTable() + if (((CachedHTableWrapper)table).getReferenceCount() <= 0) { + table.close(); + return true; + } + } } catch (IOException e) { LOG.info("Failed to correctly close HTable: " + Bytes.toString(table.getTableName()) + " ignoring since being removed from queue."); } - return true; + return false; } } @@ -73,38 +91,94 @@ public class CachingHTableFactory implements HTableFactory { private static final Log LOG = LogFactory.getLog(CachingHTableFactory.class); private static final String CACHE_SIZE_KEY = "index.tablefactory.cache.size"; - private static final int DEFAULT_CACHE_SIZE = 10; + private static final int DEFAULT_CACHE_SIZE = 1000; private HTableFactory delegate; @SuppressWarnings("rawtypes") Map openTables; + private ThreadPoolExecutor pool; - public CachingHTableFactory(HTableFactory tableFactory, Configuration conf) { - this(tableFactory, getCacheSize(conf)); + public CachingHTableFactory(HTableFactory tableFactory, Configuration conf, RegionCoprocessorEnvironment env) { + this(tableFactory, getCacheSize(conf), env); } - public CachingHTableFactory(HTableFactory factory, int cacheSize) { + public CachingHTableFactory(HTableFactory factory, int cacheSize, RegionCoprocessorEnvironment env) { this.delegate = factory; openTables = new HTableInterfaceLRUMap(cacheSize); + this.pool = new ThreadPoolExecutor(1, + env.getConfiguration().getInt(INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY, Integer.MAX_VALUE), + env.getConfiguration().getInt(HTABLE_KEEP_ALIVE_KEY, 60), TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("CachedHtables")); + pool.allowCoreThreadTimeOut(true); } - + @Override @SuppressWarnings("unchecked") - public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { + public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) throws IOException { ImmutableBytesPtr tableBytes = new ImmutableBytesPtr(tablename); synchronized (openTables) { - HTableInterface table = (HTableInterface) openTables.get(tableBytes); + CachedHTableWrapper table = (CachedHTableWrapper) openTables.get(tableBytes); if (table == null) { - table = delegate.getTable(tablename); + table = new CachedHTableWrapper(delegate.getTable(tablename, pool)); openTables.put(tableBytes, table); } + table.incrementReferenceCount(); return table; } } @Override - public void shutdown() { - this.delegate.shutdown(); - } + public void shutdown() { + this.delegate.shutdown(); + this.pool.shutdown(); + try { + boolean terminated = false; + do { + // wait until the pool has terminated + terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); + } while (!terminated); + } catch (InterruptedException e) { + this.pool.shutdownNow(); + LOG.warn("waitForTermination interrupted"); + } + } + + public static class CachedHTableWrapper extends DelegateHTable { + + private AtomicInteger referenceCount = new AtomicInteger(); + + public CachedHTableWrapper(HTableInterface table) { + super(table); + } + + @Override + public synchronized void close() throws IOException { + if (getReferenceCount() > 0) { + this.referenceCount.decrementAndGet(); + } else { + // During LRU eviction + super.close(); + } + } + + public void incrementReferenceCount() { + this.referenceCount.incrementAndGet(); + } + + public int getReferenceCount() { + return this.referenceCount.get(); + } + + } + + @Override + public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { + return getTable(tablename, this.pool); + } + + @VisibleForTesting + public ThreadPoolExecutor getPool(){ + return this.pool; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java index ded618d..45e271d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java @@ -18,6 +18,7 @@ package org.apache.phoenix.hbase.index.table; import java.io.IOException; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.TableName; @@ -36,6 +37,11 @@ public class CoprocessorHTableFactory implements HTableFactory { public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary())); } + + @Override + public HTableInterface getTable(ImmutableBytesPtr tablename,ExecutorService pool) throws IOException { + return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()), pool); + } @Override public void shutdown() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java index bef3d34..e6a2e60 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java @@ -19,9 +19,9 @@ package org.apache.phoenix.hbase.index.table; import java.io.IOException; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.hbase.client.HTableInterface; - import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; public interface HTableFactory { @@ -29,4 +29,6 @@ public interface HTableFactory { public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException; public void shutdown(); + + public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java index b8b0079..6eb657d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java @@ -54,6 +54,9 @@ public class IndexWriterUtils { /** Configuration key that HBase uses to set the max number of threads for an HTable */ public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max"; + public static final String INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY = "phoenix.index.writes.threads.max"; + public static final String HTABLE_KEEP_ALIVE_KEY = "hbase.htable.threads.keepalivetime"; + private IndexWriterUtils() { // private ctor for utilites } http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java index 0e6db58..1ab7338 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java @@ -21,14 +21,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException; import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure; import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner; @@ -41,7 +37,6 @@ import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.MetaDataUtil; import com.google.common.collect.Multimap; @@ -59,7 +54,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max"; private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10; - private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime"; + public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime"; private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class); private HTableFactory factory; @@ -84,7 +79,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout( INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, - CachingHTableFactory.getCacheSize(conf)); + CachingHTableFactory.getCacheSize(conf),env); this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion()); } @@ -93,8 +88,8 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { * <p> * Exposed for TESTING */ - void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize) { - this.factory = new CachingHTableFactory(factory, cacheSize); + void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize, RegionCoprocessorEnvironment env) { + this.factory = new CachingHTableFactory(factory, cacheSize, env); this.pool = new QuickFailingTaskRunner(pool); this.stopped = stop; } @@ -151,6 +146,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { if (LOG.isDebugEnabled()) { LOG.debug("Writing index update:" + mutations + " to table: " + tableReference); } + HTableInterface table = null; try { if (allowLocalUpdates && env!=null) { try { @@ -165,7 +161,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { } } } - HTableInterface table = factory.getTable(tableReference.get()); + table = factory.getTable(tableReference.get()); throwFailureIfDone(); int i = 0; table.batch(mutations); @@ -178,6 +174,11 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { Thread.currentThread().interrupt(); throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e); } + finally{ + if (table != null) { + table.close(); + } + } return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java index fc27f76..d244d66 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java @@ -23,13 +23,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.phoenix.hbase.index.CapturingAbortable; import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException; import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException; @@ -48,7 +45,6 @@ import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.hbase.index.write.IndexWriterUtils; import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter; import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.MetaDataUtil; import com.google.common.collect.Multimap; @@ -95,7 +91,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout( INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, - CachingHTableFactory.getCacheSize(conf)); + CachingHTableFactory.getCacheSize(conf), env); } /** @@ -103,9 +99,10 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { * <p> * Exposed for TESTING */ - void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize) { + void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize, + RegionCoprocessorEnvironment env) { this.pool = new WaitForCompletionTaskRunner(pool); - this.factory = new CachingHTableFactory(factory, cacheSize); + this.factory = new CachingHTableFactory(factory, cacheSize, env); this.abortable = new CapturingAbortable(abortable); this.stopped = stop; } @@ -148,6 +145,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { @SuppressWarnings("deprecation") @Override public Boolean call() throws Exception { + HTableInterface table = null; try { // this may have been queued, but there was an abort/stop so we try to early exit throwFailureIfDone(); @@ -170,7 +168,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { LOG.debug("Writing index update:" + mutations + " to table: " + tableReference); } - HTableInterface table = factory.getTable(tableReference.get()); + table = factory.getTable(tableReference.get()); throwFailureIfDone(); table.batch(mutations); } catch (InterruptedException e) { @@ -179,6 +177,10 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { throw e; } catch (Exception e) { throw e; + } finally { + if (table != null) { + table.close(); + } } return Boolean.TRUE; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java index 2b6be18..4483a7f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java @@ -20,9 +20,9 @@ package org.apache.phoenix.hbase.index.write; import java.io.IOException; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.hbase.client.HTableInterface; - import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -41,11 +41,16 @@ class FakeTableFactory implements HTableFactory { @Override public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { - return this.tables.get(tablename); + return getTable(tablename, null); } @Override public void shutdown() { shutdown = true; } + + @Override + public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) throws IOException { + return this.tables.get(tablename); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java index adf82f3..93ac3a6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java @@ -17,14 +17,18 @@ */ package org.apache.phoenix.hbase.index.write; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; -import org.mockito.Mockito; - import org.apache.phoenix.hbase.index.table.CachingHTableFactory; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.junit.Test; +import org.mockito.Mockito; public class TestCachingHTableFactory { @@ -32,27 +36,34 @@ public class TestCachingHTableFactory { public void testCacheCorrectlyExpiresTable() throws Exception { // setup the mocks for the tables we will request HTableFactory delegate = Mockito.mock(HTableFactory.class); + RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class); + Configuration conf =new Configuration(); + Mockito.when(e.getConfiguration()).thenReturn(conf); + Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>()); ImmutableBytesPtr t1 = new ImmutableBytesPtr(Bytes.toBytes("t1")); ImmutableBytesPtr t2 = new ImmutableBytesPtr(Bytes.toBytes("t2")); ImmutableBytesPtr t3 = new ImmutableBytesPtr(Bytes.toBytes("t3")); HTableInterface table1 = Mockito.mock(HTableInterface.class); HTableInterface table2 = Mockito.mock(HTableInterface.class); HTableInterface table3 = Mockito.mock(HTableInterface.class); - Mockito.when(delegate.getTable(t1)).thenReturn(table1); - Mockito.when(delegate.getTable(t2)).thenReturn(table2); - Mockito.when(delegate.getTable(t3)).thenReturn(table3); + // setup our factory with a cache size of 2 - CachingHTableFactory factory = new CachingHTableFactory(delegate, 2); - factory.getTable(t1); - factory.getTable(t2); - factory.getTable(t3); + CachingHTableFactory factory = new CachingHTableFactory(delegate, 2, e); + Mockito.when(delegate.getTable(t1,factory.getPool())).thenReturn(table1); + Mockito.when(delegate.getTable(t2,factory.getPool())).thenReturn(table2); + Mockito.when(delegate.getTable(t3,factory.getPool())).thenReturn(table3); + + HTableInterface ft1 =factory.getTable(t1); + HTableInterface ft2 =factory.getTable(t2); + ft1.close(); + HTableInterface ft3 = factory.getTable(t3); // get the same table a second time, after it has gone out of cache factory.getTable(t1); - Mockito.verify(delegate, Mockito.times(2)).getTable(t1); - Mockito.verify(delegate, Mockito.times(1)).getTable(t2); - Mockito.verify(delegate, Mockito.times(1)).getTable(t3); + Mockito.verify(delegate, Mockito.times(2)).getTable(t1,factory.getPool()); + Mockito.verify(delegate, Mockito.times(1)).getTable(t2,factory.getPool()); + Mockito.verify(delegate, Mockito.times(1)).getTable(t3,factory.getPool()); Mockito.verify(table1).close(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java index 8f576cf..76ea933 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java @@ -29,6 +29,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -90,6 +91,10 @@ public class TestIndexWriter { LOG.info("Current thread is interrupted: " + Thread.interrupted()); Abortable abort = new StubAbortable(); Stoppable stop = Mockito.mock(Stoppable.class); + RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class); + Configuration conf =new Configuration(); + Mockito.when(e.getConfiguration()).thenReturn(conf); + Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>()); ExecutorService exec = Executors.newFixedThreadPool(1); Map<ImmutableBytesPtr, HTableInterface> tables = new HashMap<ImmutableBytesPtr, HTableInterface>(); FakeTableFactory factory = new FakeTableFactory(tables); @@ -117,7 +122,7 @@ public class TestIndexWriter { // setup the writer and failure policy ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); - committer.setup(factory, exec, abort, stop, 2); + committer.setup(factory, exec, abort, stop, 2, e); KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy(); policy.setup(stop, abort); IndexWriter writer = new IndexWriter(committer, policy); @@ -164,7 +169,10 @@ public class TestIndexWriter { Mockito.when(table.batch(Mockito.anyList())).thenThrow( new IOException("Intentional IOException for failed first write.")); Mockito.when(table.getTableName()).thenReturn(tableName); - + RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class); + Configuration conf =new Configuration(); + Mockito.when(e.getConfiguration()).thenReturn(conf); + Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>()); // second table just blocks to make sure that the abort propagates to the third task final CountDownLatch waitOnAbortedLatch = new CountDownLatch(1); final boolean[] failed = new boolean[] { false }; @@ -190,15 +198,15 @@ public class TestIndexWriter { tables.put(new ImmutableBytesPtr(tableName2), table2); ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); - committer.setup(factory, exec, abort, stop, 2); + committer.setup(factory, exec, abort, stop, 2, e); KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy(); policy.setup(stop, abort); IndexWriter writer = new IndexWriter(committer, policy); try { writer.write(indexUpdates); fail("Should not have successfully completed all index writes"); - } catch (SingleIndexWriteFailureException e) { - LOG.info("Correctly got a failure to reach the index", e); + } catch (SingleIndexWriteFailureException s) { + LOG.info("Correctly got a failure to reach the index", s); // should have correctly gotten the correct abort, so let the next task execute waitOnAbortedLatch.countDown(); } @@ -223,6 +231,10 @@ public class TestIndexWriter { // single thread factory so the older request gets queued ExecutorService exec = Executors.newFixedThreadPool(1); Map<ImmutableBytesPtr, HTableInterface> tables = new HashMap<ImmutableBytesPtr, HTableInterface>(); + RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class); + Configuration conf =new Configuration(); + Mockito.when(e.getConfiguration()).thenReturn(conf); + Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>()); FakeTableFactory factory = new FakeTableFactory(tables); byte[] tableName = this.testName.getTableName(); @@ -257,7 +269,7 @@ public class TestIndexWriter { // setup the writer ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); - committer.setup(factory, exec, abort, stop, 2); + committer.setup(factory, exec, abort, stop, 2, e ); KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy(); policy.setup(stop, abort); final IndexWriter writer = new IndexWriter(committer, policy); http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java index 1f1e37e..ab88cd2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java @@ -22,16 +22,19 @@ import static org.junit.Assert.assertTrue; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.phoenix.hbase.index.StubAbortable; @@ -57,13 +60,17 @@ public class TestParalleIndexWriter { @Test public void testCorrectlyCleansUpResources() throws Exception{ ExecutorService exec = Executors.newFixedThreadPool(1); + RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class); + Configuration conf =new Configuration(); + Mockito.when(e.getConfiguration()).thenReturn(conf); + Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>()); FakeTableFactory factory = new FakeTableFactory( Collections.<ImmutableBytesPtr, HTableInterface> emptyMap()); ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); Abortable mockAbort = Mockito.mock(Abortable.class); Stoppable mockStop = Mockito.mock(Stoppable.class); // create a simple writer - writer.setup(factory, exec, mockAbort, mockStop, 1); + writer.setup(factory, exec, mockAbort, mockStop, 1,e); // stop the writer writer.stop(this.test.getTableNameString() + " finished"); assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown); @@ -82,7 +89,10 @@ public class TestParalleIndexWriter { Map<ImmutableBytesPtr, HTableInterface> tables = new HashMap<ImmutableBytesPtr, HTableInterface>(); FakeTableFactory factory = new FakeTableFactory(tables); - + RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class); + Configuration conf =new Configuration(); + Mockito.when(e.getConfiguration()).thenReturn(conf); + Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>()); ImmutableBytesPtr tableName = new ImmutableBytesPtr(this.test.getTableName()); Put m = new Put(row); m.add(Bytes.toBytes("family"), Bytes.toBytes("qual"), null); @@ -107,7 +117,7 @@ public class TestParalleIndexWriter { // setup the writer and failure policy ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); - writer.setup(factory, exec, abort, stop, 1); + writer.setup(factory, exec, abort, stop, 1, e); writer.write(indexUpdates, true); assertTrue("Writer returned before the table batch completed! Likely a race condition tripped", completed[0]); http://git-wip-us.apache.org/repos/asf/phoenix/blob/be90ecf2/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java index 8eece3b..219f615 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java @@ -22,16 +22,19 @@ import static org.junit.Assert.assertTrue; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.phoenix.hbase.index.StubAbortable; @@ -62,8 +65,12 @@ public class TestParalleWriterIndexCommitter { ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); Abortable mockAbort = Mockito.mock(Abortable.class); Stoppable mockStop = Mockito.mock(Stoppable.class); + RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class); + Configuration conf =new Configuration(); + Mockito.when(e.getConfiguration()).thenReturn(conf); + Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>()); // create a simple writer - writer.setup(factory, exec, mockAbort, mockStop, 1); + writer.setup(factory, exec, mockAbort, mockStop, 1, e); // stop the writer writer.stop(this.test.getTableNameString() + " finished"); assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown); @@ -77,6 +84,10 @@ public class TestParalleWriterIndexCommitter { LOG.info("Starting " + test.getTableNameString()); LOG.info("Current thread is interrupted: " + Thread.interrupted()); Abortable abort = new StubAbortable(); + RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class); + Configuration conf =new Configuration(); + Mockito.when(e.getConfiguration()).thenReturn(conf); + Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>()); Stoppable stop = Mockito.mock(Stoppable.class); ExecutorService exec = Executors.newFixedThreadPool(1); Map<ImmutableBytesPtr, HTableInterface> tables = @@ -107,7 +118,7 @@ public class TestParalleWriterIndexCommitter { // setup the writer and failure policy ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); - writer.setup(factory, exec, abort, stop, 1); + writer.setup(factory, exec, abort, stop, 1, e); writer.write(indexUpdates, true); assertTrue("Writer returned before the table batch completed! Likely a race condition tripped", completed[0]);