[ https://issues.apache.org/jira/browse/TEPHRA-215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15858727#comment-15858727 ]
ASF GitHub Bot commented on TEPHRA-215: --------------------------------------- Github user poornachandra commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/32#discussion_r100200476 --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java --- @@ -18,77 +18,85 @@ package org.apache.tephra.hbase.txprune; +import com.google.common.util.concurrent.AbstractIdleService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; -import org.apache.tephra.hbase.coprocessor.TransactionProcessor; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; /** * Thread that will write the the prune upper bound */ -public class PruneUpperBoundWriter { - private static final Log LOG = LogFactory.getLog(TransactionProcessor.class); +public class PruneUpperBoundWriter extends AbstractIdleService { + private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class); - private final TableName pruneStateTable; + private final TableName tableName; private final DataJanitorState dataJanitorState; - private final byte[] regionName; - private final String regionNameAsString; private final long pruneFlushInterval; - private final AtomicLong pruneUpperBound; - private final AtomicBoolean shouldFlush; + private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries; + + private volatile boolean stopped; + private volatile Thread flushThread; - private Thread flushThread; private long lastChecked; - public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString, - byte[] regionName, long pruneFlushInterval) { - this.pruneStateTable = pruneStateTable; + public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) { + this.tableName = tableName; this.dataJanitorState = dataJanitorState; - this.regionName = regionName; - this.regionNameAsString = regionNameAsString; this.pruneFlushInterval = pruneFlushInterval; - this.pruneUpperBound = new AtomicLong(); - this.shouldFlush = new AtomicBoolean(false); - startFlushThread(); + this.pruneEntries = new ConcurrentSkipListMap<>(); + } + + public void persistPruneEntry(byte[] regionName, long pruneUpperBound) { + // The number of entries in this map is bound by the number of regions in this region server and thus it will not + // grow indefinitely + pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound); } public boolean isAlive() { - return flushThread.isAlive(); + return flushThread != null && flushThread.isAlive(); } - public void persistPruneEntry(long pruneUpperBound) { - this.pruneUpperBound.set(pruneUpperBound); - this.shouldFlush.set(true); + @Override + protected void startUp() throws Exception { + LOG.info("Starting PruneUpperBoundWriter Thread."); + startFlushThread(); } - public void stop() { + @Override + protected void shutDown() throws Exception { if (flushThread != null) { + stopped = true; + LOG.info("Stopping PruneUpperBoundWriter Thread."); flushThread.interrupt(); + flushThread.join(TimeUnit.SECONDS.toMillis(1)); } } private void startFlushThread() { flushThread = new Thread("tephra-prune-upper-bound-writer") { @Override public void run() { - while (!isInterrupted()) { + while (!isInterrupted() && !stopped) { --- End diff -- We can replace the check for `stopped` with `isRunning()` > Share PruneUpperBoundWriter across all TransactionProcessors on the same > region server > -------------------------------------------------------------------------------------- > > Key: TEPHRA-215 > URL: https://issues.apache.org/jira/browse/TEPHRA-215 > Project: Tephra > Issue Type: Improvement > Affects Versions: 0.11.0-incubating > Reporter: Gokul Gunasekaran > Assignee: Gokul Gunasekaran > Fix For: 0.11.0-incubating > > > Currently we start one prune upperbound writer thread per > TransactionProcessor coprocessor. Instead we should be able to share only one > thread that flushes writes to the prune state table periodically. -- This message was sent by Atlassian JIRA (v6.3.15#6346)