[ https://issues.apache.org/jira/browse/TEPHRA-215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15858629#comment-15858629 ]
ASF GitHub Bot commented on TEPHRA-215: --------------------------------------- Github user gokulavasan commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/32#discussion_r100187618 --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java --- @@ -18,56 +18,56 @@ package org.apache.tephra.hbase.txprune; +import com.google.common.util.concurrent.AbstractIdleService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.TableName; -import org.apache.tephra.hbase.coprocessor.TransactionProcessor; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; /** * Thread that will write the the prune upper bound */ -public class PruneUpperBoundWriter { - private static final Log LOG = LogFactory.getLog(TransactionProcessor.class); +public class PruneUpperBoundWriter extends AbstractIdleService { + private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class); - private final TableName pruneStateTable; private final DataJanitorState dataJanitorState; - private final byte[] regionName; - private final String regionNameAsString; private final long pruneFlushInterval; - private final AtomicLong pruneUpperBound; - private final AtomicBoolean shouldFlush; + private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries; + + private volatile boolean stopped; + private volatile Thread flushThread; - private Thread flushThread; private long lastChecked; - public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString, - byte[] regionName, long pruneFlushInterval) { - this.pruneStateTable = pruneStateTable; + public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long pruneFlushInterval) { this.dataJanitorState = dataJanitorState; - this.regionName = regionName; - this.regionNameAsString = regionNameAsString; this.pruneFlushInterval = pruneFlushInterval; - this.pruneUpperBound = new AtomicLong(); - this.shouldFlush = new AtomicBoolean(false); - startFlushThread(); + this.pruneEntries = new ConcurrentSkipListMap<>(); + } + + public void persistPruneEntry(byte[] regionName, long pruneUpperBound) { + // The number of entries in this map is bound by the number of regions in this region server and thus it will not + // grow indefinitely + pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound); } public boolean isAlive() { - return flushThread.isAlive(); + return flushThread != null && flushThread.isAlive(); } - public void persistPruneEntry(long pruneUpperBound) { - this.pruneUpperBound.set(pruneUpperBound); - this.shouldFlush.set(true); + @Override + protected void startUp() throws Exception { + startFlushThread(); } - public void stop() { + @Override + protected void shutDown() throws Exception { if (flushThread != null) { --- End diff -- Added log statements. > Share PruneUpperBoundWriter across all TransactionProcessors on the same > region server > -------------------------------------------------------------------------------------- > > Key: TEPHRA-215 > URL: https://issues.apache.org/jira/browse/TEPHRA-215 > Project: Tephra > Issue Type: Improvement > Affects Versions: 0.11.0-incubating > Reporter: Gokul Gunasekaran > Assignee: Gokul Gunasekaran > Fix For: 0.11.0-incubating > > > Currently we start one prune upperbound writer thread per > TransactionProcessor coprocessor. Instead we should be able to share only one > thread that flushes writes to the prune state table periodically. -- This message was sent by Atlassian JIRA (v6.3.15#6346)