Author: kturner Date: Thu Jul 5 16:19:05 2012 New Revision: 1357721 URL: http://svn.apache.org/viewvc?rev=1357721&view=rev Log: ACCUMULO-409 Modified log recovery to use new DistributedWorkQueue
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java?rev=1357721&r1=1357720&r2=1357721&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java Thu Jul 5 16:19:05 2012 @@ -21,9 +21,9 @@ import java.io.IOException; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.Repo; -import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.master.Master; import org.apache.accumulo.server.master.tableOps.MasterRepo; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -48,18 +48,19 @@ public class SubmitFileForRecovery exten public Repo<Master> call(long tid, final Master master) throws Exception { master.updateRecoveryInProgress(file); String source = RecoverLease.getSource(master, server, file).toString(); + new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(file, source.getBytes()); + ZooReaderWriter zoo = ZooReaderWriter.getInstance(); final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file; - zoo.putPersistentData(path, source.getBytes(), NodeExistsPolicy.SKIP); log.info("Created zookeeper entry " + path + " with data " + source); zoo.exists(path, new Watcher() { @Override public void process(WatchedEvent event) { switch (event.getType()) { - case NodeDataChanged: + case NodeDeleted: log.info("noticed recovery entry for " + file + " was removed"); FileSystem fs = master.getFileSystem(); - Path finished = new Path(Constants.getRecoveryDir(master.getSystemConfiguration()), "finished"); + Path finished = new Path(Constants.getRecoveryDir(master.getSystemConfiguration()) + "/" + file, "finished"); try { if (fs.exists(finished)) log.info("log recovery for " + file + " successful"); Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1357721&r1=1357720&r2=1357721&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Thu Jul 5 16:19:05 2012 @@ -51,7 +51,6 @@ import java.util.concurrent.ArrayBlockin import java.util.concurrent.BlockingDeque; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; @@ -126,9 +125,9 @@ import org.apache.accumulo.core.util.Cac import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.LoggingRunnable; -import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.Stat; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; @@ -2714,22 +2713,22 @@ public class TabletServer extends Abstra } clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort); announceExistence(); - try { - logSorter.startWatchingForRecoveryLogs(getClientAddressString()); - } catch (Exception ex) { - log.error("Error setting watches for recoveries"); - throw new RuntimeException(ex); - } - ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), - new NamingThreadFactory("distributed work queue")); + ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue"); - bulkFailedCopyQ = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZBULK_FAILED_COPYQ); + bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZBULK_FAILED_COPYQ); try { bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), distWorkQThreadPool); } catch (Exception e1) { throw new RuntimeException("Failed to start distributed work queue for copying ", e1); } + + try { + logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool); + } catch (Exception ex) { + log.error("Error setting watches for recoveries"); + throw new RuntimeException(ex); + } try { OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName()); Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1357721&r1=1357720&r2=1357721&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Thu Jul 5 16:19:05 2012 @@ -25,8 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Random; -import java.util.TimerTask; import java.util.concurrent.ThreadPoolExecutor; import org.apache.accumulo.core.Constants; @@ -37,65 +35,84 @@ import org.apache.accumulo.core.master.t import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; -import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; -import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.server.logger.LogFileKey; import org.apache.accumulo.server.logger.LogFileValue; -import org.apache.accumulo.server.util.time.SimpleTimer; -import org.apache.accumulo.server.zookeeper.ZooLock; -import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; /** * */ public class LogSorter { + private static final Logger log = Logger.getLogger(LogSorter.class); FileSystem fs; AccumuloConfiguration conf; - private Map<String,Work> currentWork = new HashMap<String,Work>(); + private Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>()); - class Work implements Runnable { - final String name; - FSDataInputStream input; - final String destPath; - long bytesCopied = -1; - long sortStart = 0; - long sortStop = -1; - private final LogSortNotifier cback; + class LogProcessor implements Processor { - synchronized long getBytesCopied() throws IOException { - return input == null ? bytesCopied : input.getPos(); + private FSDataInputStream input; + private long bytesCopied = -1; + private long sortStart = 0; + private long sortStop = -1; + + @Override + public Processor newProcessor() { + return new LogProcessor(); + } + + @Override + public void process(String child, byte[] data) { + String dest = Constants.getRecoveryDir(conf) + "/" + child; + String src = new String(data); + String name = new Path(src).getName(); + + synchronized (currentWork) { + if (currentWork.containsKey(name)) + return; + currentWork.put(name, this); + } + + try { + log.info("Copying " + src + " to " + dest); + sort(name, new Path(src), dest); + } finally { + currentWork.remove(name); + } + } - Work(String name, FSDataInputStream input, String destPath, LogSortNotifier cback) { - this.name = name; - this.input = input; - this.destPath = destPath; - this.cback = cback; - } - synchronized boolean finished() { - return input == null; - } - public void run() { - sortStart = System.currentTimeMillis(); + public void sort(String name, Path srcPath, String destPath) { + + synchronized (this) { + sortStart = System.currentTimeMillis(); + } + String formerThreadName = Thread.currentThread().getName(); int part = 0; try { + + // the following call does not throw an exception if the file/dir does not exist + fs.delete(new Path(destPath), true); + + FSDataInputStream tmpInput = fs.open(srcPath); + synchronized (this) { + this.input = tmpInput; + } + final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE); Thread.currentThread().setName("Sorting " + name + " for recovery"); while (true) { - final ArrayList<Pair<LogFileKey, LogFileValue>> buffer = new ArrayList<Pair<LogFileKey, LogFileValue>>(); + final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<Pair<LogFileKey,LogFileValue>>(); try { long start = input.getPos(); while (input.getPos() - start < bufferSize) { @@ -103,29 +120,26 @@ public class LogSorter { LogFileValue value = new LogFileValue(); key.readFields(input); value.readFields(input); - buffer.add(new Pair<LogFileKey, LogFileValue>(key, value)); + buffer.add(new Pair<LogFileKey,LogFileValue>(key, value)); } - writeBuffer(buffer, part++); + writeBuffer(destPath, buffer, part++); buffer.clear(); } catch (EOFException ex) { - writeBuffer(buffer, part++); + writeBuffer(destPath, buffer, part++); break; } } fs.create(new Path(destPath, "finished")).close(); - log.debug("Log copy/sort of " + name + " complete"); + log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms"); } catch (Throwable t) { try { + // parent dir may not exist + fs.mkdirs(new Path(destPath)); fs.create(new Path(destPath, "failed")).close(); } catch (IOException e) { log.error("Error creating failed flag file " + name, e); } log.error(t, t); - try { - cback.notice(name, getBytesCopied(), part, getSortTime(), t.toString()); - } catch (Exception ex) { - log.error("Strange error notifying the master of a logSort problem for file " + name); - } } finally { Thread.currentThread().setName(formerThreadName); try { @@ -133,19 +147,13 @@ public class LogSorter { } catch (IOException e) { log.error("Error during cleanup sort/copy " + name, e); } - sortStop = System.currentTimeMillis(); - synchronized (currentWork) { - currentWork.remove(name); - } - try { - cback.notice(name, getBytesCopied(), part, getSortTime(), ""); - } catch (Exception ex) { - log.error("Strange error reporting successful log sort " + name, ex); + synchronized (this) { + sortStop = System.currentTimeMillis(); } } } - private void writeBuffer(ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException { + private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException { String path = destPath + String.format("/part-r-%05d", part++); MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class); try { @@ -162,7 +170,7 @@ public class LogSorter { output.close(); } } - + synchronized void close() throws IOException { bytesCopied = input.getPos(); input.close(); @@ -177,9 +185,13 @@ public class LogSorter { } return 0; } - }; + + synchronized long getBytesCopied() throws IOException { + return input == null ? bytesCopied : input.getPos(); + } + } - final ThreadPoolExecutor threadPool; + ThreadPoolExecutor threadPool; private Instance instance; public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) { @@ -189,132 +201,16 @@ public class LogSorter { int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT); this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName()); } - - public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException { - final String path = ZooUtil.getRoot(instance) + Constants.ZRECOVERY; - final ZooReaderWriter zoo = ZooReaderWriter.getInstance(); - zoo.mkdirs(path); - List<String> children = zoo.getChildren(path, new Watcher() { - @Override - public void process(WatchedEvent event) { - switch (event.getType()) { - case NodeChildrenChanged: - if (event.getPath().equals(path)) - try { - attemptRecoveries(zoo, serverName, path, zoo.getChildren(path, this)); - } catch (KeeperException e) { - log.error("Unable to get recovery information", e); - } catch (InterruptedException e) { - log.info("Interrupted getting recovery information", e); - } - else - log.info("Unexpected path for NodeChildrenChanged event " + event.getPath()); - break; - case NodeCreated: - case NodeDataChanged: - case NodeDeleted: - case None: - log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path); - break; - - } - } - }); - attemptRecoveries(zoo, serverName, path, children); - Random r = new Random(); - // Add a little jitter to avoid all the tservers slamming zookeeper at once - SimpleTimer.getInstance().schedule(new TimerTask() { - @Override - public void run() { - try { - attemptRecoveries(zoo, serverName, path, zoo.getChildren(path)); - } catch (KeeperException e) { - log.error("Unable to get recovery information", e); - } catch (InterruptedException e) { - log.info("Interrupted getting recovery information", e); - } - } - }, r.nextInt(1000), 60 * 1000); - } - - private void attemptRecoveries(final ZooReaderWriter zoo, final String serverName, final String path, List<String> children) { - if (children.size() == 0) - return; - - if (threadPool.getQueue().size() > 1) - return; - log.debug("Zookeeper references " + children.size() + " recoveries, attempting locks"); - Random random = new Random(); - Collections.shuffle(children, random); - try { - for (String child : children) { - final String childPath = path + "/" + child; - log.debug("Attempting to lock " + child); - ZooLock lock = new ZooLock(childPath); - if (lock.tryLock(new LockWatcher() { - @Override - public void lostLock(LockLossReason reason) { - log.info("Ignoring lost lock event, reason " + reason); - } - }, serverName.getBytes())) { - // Great... we got the lock, but maybe we're too busy - if (threadPool.getQueue().size() > 1) { - lock.unlock(); - log.debug("got the lock, but thread pool is busy; released the lock on " + child); - break; - } - log.debug("got lock for " + child); - byte[] contents = zoo.getData(childPath, null); - String destination = Constants.getRecoveryDir(conf) + "/" + child; - startSort(new String(contents), destination, new LogSortNotifier() { - @Override - public void notice(String name, long bytes, int parts, long milliseconds, String error) { - log.info("Finished log sort " + name + " " + bytes + " bytes " + parts + " parts in " + milliseconds + "ms"); - try { - zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP); - } catch (Exception e) { - log.error("Error received when trying to delete recovery entry in zookeeper " + childPath); - } - try { - attemptRecoveries(zoo, serverName, path, zoo.getChildren(path)); - } catch (KeeperException e) { - log.error("Unable to get recovery information", e); - } catch (InterruptedException e) { - log.info("Interrupted getting recovery information", e); - } - } - }); - } else { - log.debug("failed to get the lock " + child); - } - } - } catch (Throwable t) { - log.error("Unexpected error", t); - } - } - - public interface LogSortNotifier { - public void notice(String name, long bytes, int parts, long milliseconds, String error); - } - - private void startSort(String src, String dest, LogSortNotifier cback) throws IOException { - log.info("Copying " + src + " to " + dest); - fs.delete(new Path(dest), true); - Path srcPath = new Path(src); - synchronized (currentWork) { - Work work = new Work(srcPath.getName(), fs.open(srcPath), dest, cback); - if (!currentWork.containsKey(srcPath.getName())) { - threadPool.execute(work); - currentWork.put(srcPath.getName(), work); - } - } + public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException { + this.threadPool = distWorkQThreadPool; + new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool); } public List<RecoveryStatus> getLogSorts() { List<RecoveryStatus> result = new ArrayList<RecoveryStatus>(); synchronized (currentWork) { - for (Entry<String,Work> entries : currentWork.entrySet()) { + for (Entry<String,LogProcessor> entries : currentWork.entrySet()) { RecoveryStatus status = new RecoveryStatus(); status.name = entries.getKey(); try {