Repository: phoenix Updated Branches: refs/heads/master 18ea6edc0 -> b2ab339a1
PHOENIX-4021 - Remove CachingHTableFactory Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b2ab339a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b2ab339a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b2ab339a Branch: refs/heads/master Commit: b2ab339a1690384c89d928c9dd3244d7518a8033 Parents: 18ea6ed Author: Geoffrey Jacoby <[email protected]> Authored: Thu Jul 13 18:43:29 2017 -0700 Committer: gjacoby <[email protected]> Committed: Mon Jul 17 09:24:35 2017 -0700 ---------------------------------------------------------------------- .../hbase/index/table/CachingHTableFactory.java | 184 ------------------- .../write/ParallelWriterIndexCommitter.java | 11 +- .../TrackingParallelWriterIndexCommitter.java | 11 +- .../index/write/TestCachingHTableFactory.java | 69 ------- .../hbase/index/write/TestIndexWriter.java | 6 +- .../index/write/TestParalleIndexWriter.java | 4 +- .../write/TestParalleWriterIndexCommitter.java | 4 +- 7 files changed, 15 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b2ab339a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java deleted file mode 100644 index d0df5b3..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hbase.index.table; - -import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.HTABLE_KEEP_ALIVE_KEY; -import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.collections.map.LRUMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.phoenix.execute.DelegateHTable; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; - -import com.google.common.annotations.VisibleForTesting;; - -/** - * A simple cache that just uses usual GC mechanisms to cleanup unused {@link HTableInterface}s. - * When requesting an {@link HTableInterface} via {@link #getTable}, you may get the same table as - * last time, or it may be a new table. - * <p> - * You <b>should not call {@link HTableInterface#close()} </b> that is handled when the table goes - * out of scope. Along the same lines, you must ensure to not keep a reference to the table for - * longer than necessary - this leak will ensure that the table never gets closed. - */ -public class CachingHTableFactory implements HTableFactory { - - /** - * LRUMap that closes the {@link HTableInterface} when the table is evicted - */ - @SuppressWarnings("serial") - public class HTableInterfaceLRUMap extends LRUMap { - - public HTableInterfaceLRUMap(int cacheSize) { - super(cacheSize, true); - } - - @Override - protected boolean removeLRU(LinkEntry entry) { - HTableInterface table = (HTableInterface) entry.getValue(); - if (LOG.isDebugEnabled()) { - LOG.debug("Closing connection to table: " + Bytes.toString(table.getTableName()) - + " because it was evicted from the cache."); - } - try { - synchronized (this) { // the whole operation of closing and checking the count should be atomic - // and should not conflict with getTable() - if (((CachedHTableWrapper)table).getReferenceCount() <= 0) { - table.close(); - return true; - } - } - } catch (IOException e) { - LOG.info("Failed to correctly close HTable: " + Bytes.toString(table.getTableName()) - + " ignoring since being removed from queue."); - } - return false; - } - } - - public static int getCacheSize(Configuration conf) { - return conf.getInt(CACHE_SIZE_KEY, DEFAULT_CACHE_SIZE); - } - - private static final Log LOG = LogFactory.getLog(CachingHTableFactory.class); - private static final String CACHE_SIZE_KEY = "index.tablefactory.cache.size"; - private static final int DEFAULT_CACHE_SIZE = 1000; - - private HTableFactory delegate; - - @SuppressWarnings("rawtypes") - Map openTables; - private ThreadPoolExecutor pool; - - public CachingHTableFactory(HTableFactory tableFactory, Configuration conf, RegionCoprocessorEnvironment env) { - this(tableFactory, getCacheSize(conf), env); - } - - public CachingHTableFactory(HTableFactory factory, int cacheSize, RegionCoprocessorEnvironment env) { - this.delegate = factory; - openTables = new HTableInterfaceLRUMap(cacheSize); - this.pool = new ThreadPoolExecutor(1, - env.getConfiguration().getInt(INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY, Integer.MAX_VALUE), - env.getConfiguration().getInt(HTABLE_KEEP_ALIVE_KEY, 60), TimeUnit.SECONDS, - new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("CachedHtables")); - pool.allowCoreThreadTimeOut(true); - } - - @Override - @SuppressWarnings("unchecked") - public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) throws IOException { - ImmutableBytesPtr tableBytes = new ImmutableBytesPtr(tablename); - synchronized (openTables) { - CachedHTableWrapper table = (CachedHTableWrapper) openTables.get(tableBytes); - if (table == null) { - table = new CachedHTableWrapper(delegate.getTable(tablename, pool)); - openTables.put(tableBytes, table); - } - table.incrementReferenceCount(); - return table; - } - } - - @Override - public void shutdown() { - this.delegate.shutdown(); - this.pool.shutdown(); - try { - boolean terminated = false; - do { - // wait until the pool has terminated - terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); - } while (!terminated); - } catch (InterruptedException e) { - this.pool.shutdownNow(); - LOG.warn("waitForTermination interrupted"); - } - } - - public static class CachedHTableWrapper extends DelegateHTable { - - private AtomicInteger referenceCount = new AtomicInteger(); - - public CachedHTableWrapper(HTableInterface table) { - super(table); - } - - @Override - public synchronized void close() throws IOException { - if (getReferenceCount() > 0) { - this.referenceCount.decrementAndGet(); - } else { - // During LRU eviction - super.close(); - } - } - - public void incrementReferenceCount() { - this.referenceCount.incrementAndGet(); - } - - public int getReferenceCount() { - return this.referenceCount.get(); - } - - } - - @Override - public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { - return getTable(tablename, this.pool); - } - - @VisibleForTesting - public ThreadPoolExecutor getPool(){ - return this.pool; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b2ab339a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java index a537010..e4e8343 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java @@ -32,7 +32,6 @@ import org.apache.phoenix.hbase.index.parallel.Task; import org.apache.phoenix.hbase.index.parallel.TaskBatch; import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; -import org.apache.phoenix.hbase.index.table.CachingHTableFactory; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; @@ -78,8 +77,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { ThreadPoolManager.getExecutor( new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout( - INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, - CachingHTableFactory.getCacheSize(conf),env); + INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env); this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion()); } @@ -88,8 +86,8 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { * <p> * Exposed for TESTING */ - void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize, RegionCoprocessorEnvironment env) { - this.factory = new CachingHTableFactory(factory, cacheSize, env); + void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, RegionCoprocessorEnvironment env) { + this.factory = factory; this.pool = new QuickFailingTaskRunner(pool); this.stopped = stop; } @@ -131,8 +129,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { tasks.add(new Task<Void>() { /** - * Do the actual write to the primary table. We don't need to worry about closing the table because that - * is handled the {@link CachingHTableFactory}. + * Do the actual write to the primary table. * * @return */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/b2ab339a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java index 074d0b9..d2436ba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java @@ -37,7 +37,6 @@ import org.apache.phoenix.hbase.index.parallel.TaskRunner; import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; -import org.apache.phoenix.hbase.index.table.CachingHTableFactory; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.write.IndexCommitter; @@ -90,8 +89,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { ThreadPoolManager.getExecutor( new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout( - INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, - CachingHTableFactory.getCacheSize(conf), env); + INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env); } /** @@ -99,10 +97,10 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { * <p> * Exposed for TESTING */ - void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize, + void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, RegionCoprocessorEnvironment env) { this.pool = new WaitForCompletionTaskRunner(pool); - this.factory = new CachingHTableFactory(factory, cacheSize, env); + this.factory = factory; this.abortable = new CapturingAbortable(abortable); this.stopped = stop; } @@ -139,8 +137,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { tasks.add(new Task<Boolean>() { /** - * Do the actual write to the primary table. We don't need to worry about closing the table because that - * is handled the {@link CachingHTableFactory}. + * Do the actual write to the primary table. */ @SuppressWarnings("deprecation") @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/b2ab339a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java deleted file mode 100644 index 93ac3a6..0000000 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hbase.index.write; - - -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.hbase.index.table.CachingHTableFactory; -import org.apache.phoenix.hbase.index.table.HTableFactory; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.junit.Test; -import org.mockito.Mockito; - -public class TestCachingHTableFactory { - - @Test - public void testCacheCorrectlyExpiresTable() throws Exception { - // setup the mocks for the tables we will request - HTableFactory delegate = Mockito.mock(HTableFactory.class); - RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class); - Configuration conf =new Configuration(); - Mockito.when(e.getConfiguration()).thenReturn(conf); - Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>()); - ImmutableBytesPtr t1 = new ImmutableBytesPtr(Bytes.toBytes("t1")); - ImmutableBytesPtr t2 = new ImmutableBytesPtr(Bytes.toBytes("t2")); - ImmutableBytesPtr t3 = new ImmutableBytesPtr(Bytes.toBytes("t3")); - HTableInterface table1 = Mockito.mock(HTableInterface.class); - HTableInterface table2 = Mockito.mock(HTableInterface.class); - HTableInterface table3 = Mockito.mock(HTableInterface.class); - - - // setup our factory with a cache size of 2 - CachingHTableFactory factory = new CachingHTableFactory(delegate, 2, e); - Mockito.when(delegate.getTable(t1,factory.getPool())).thenReturn(table1); - Mockito.when(delegate.getTable(t2,factory.getPool())).thenReturn(table2); - Mockito.when(delegate.getTable(t3,factory.getPool())).thenReturn(table3); - - HTableInterface ft1 =factory.getTable(t1); - HTableInterface ft2 =factory.getTable(t2); - ft1.close(); - HTableInterface ft3 = factory.getTable(t3); - // get the same table a second time, after it has gone out of cache - factory.getTable(t1); - - Mockito.verify(delegate, Mockito.times(2)).getTable(t1,factory.getPool()); - Mockito.verify(delegate, Mockito.times(1)).getTable(t2,factory.getPool()); - Mockito.verify(delegate, Mockito.times(1)).getTable(t3,factory.getPool()); - Mockito.verify(table1).close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b2ab339a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java index 8317b5c..8fb1c10 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java @@ -128,7 +128,7 @@ public class TestIndexWriter { // setup the writer and failure policy ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); - committer.setup(factory, exec, abort, stop, 2, e); + committer.setup(factory, exec, abort, stop, e); KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy(); policy.setup(stop, abort); IndexWriter writer = new IndexWriter(committer, policy); @@ -204,7 +204,7 @@ public class TestIndexWriter { tables.put(new ImmutableBytesPtr(tableName2), table2); ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); - committer.setup(factory, exec, abort, stop, 2, e); + committer.setup(factory, exec, abort, stop, e); KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy(); policy.setup(stop, abort); IndexWriter writer = new IndexWriter(committer, policy); @@ -275,7 +275,7 @@ public class TestIndexWriter { // setup the writer ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); - committer.setup(factory, exec, abort, stop, 2, e ); + committer.setup(factory, exec, abort, stop, e ); KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy(); policy.setup(stop, abort); final IndexWriter writer = new IndexWriter(committer, policy); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b2ab339a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java index ab88cd2..8e4e7db 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java @@ -70,7 +70,7 @@ public class TestParalleIndexWriter { Abortable mockAbort = Mockito.mock(Abortable.class); Stoppable mockStop = Mockito.mock(Stoppable.class); // create a simple writer - writer.setup(factory, exec, mockAbort, mockStop, 1,e); + writer.setup(factory, exec, mockAbort, mockStop,e); // stop the writer writer.stop(this.test.getTableNameString() + " finished"); assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown); @@ -117,7 +117,7 @@ public class TestParalleIndexWriter { // setup the writer and failure policy ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); - writer.setup(factory, exec, abort, stop, 1, e); + writer.setup(factory, exec, abort, stop, e); writer.write(indexUpdates, true); assertTrue("Writer returned before the table batch completed! Likely a race condition tripped", completed[0]); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b2ab339a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java index 219f615..e737aa7 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java @@ -70,7 +70,7 @@ public class TestParalleWriterIndexCommitter { Mockito.when(e.getConfiguration()).thenReturn(conf); Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>()); // create a simple writer - writer.setup(factory, exec, mockAbort, mockStop, 1, e); + writer.setup(factory, exec, mockAbort, mockStop, e); // stop the writer writer.stop(this.test.getTableNameString() + " finished"); assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown); @@ -118,7 +118,7 @@ public class TestParalleWriterIndexCommitter { // setup the writer and failure policy ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); - writer.setup(factory, exec, abort, stop, 1, e); + writer.setup(factory, exec, abort, stop, e); writer.write(indexUpdates, true); assertTrue("Writer returned before the table batch completed! Likely a race condition tripped", completed[0]);
