Repository: accumulo Updated Branches: refs/heads/master 0b7d00db8 -> e2e5afb2a
ACCUMULO-3327 clean up book-keeping periodically, checking against zookeeper Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8efc9546 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8efc9546 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8efc9546 Branch: refs/heads/master Commit: 8efc9546400f9b76afc3bfad93046a487c147e82 Parents: 8ccd7e7 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Tue May 12 12:42:14 2015 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Tue May 12 12:42:14 2015 -0400 ---------------------------------------------------------------------- .../server/zookeeper/TransactionWatcher.java | 21 +++++++ .../apache/accumulo/tserver/TabletServer.java | 4 ++ .../tserver/tablet/BulkImportCacheCleaner.java | 60 ++++++++++++++++++++ .../apache/accumulo/tserver/tablet/Tablet.java | 16 +++++- .../performance/metadata/FastBulkImportIT.java | 2 +- 5 files changed, 99 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java index 0e1cdfd..da94a3c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java +++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java @@ -16,8 +16,13 @@ */ package org.apache.accumulo.server.zookeeper; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReader; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooReader; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -59,6 +64,22 @@ public class TransactionWatcher extends org.apache.accumulo.fate.zookeeper.Trans writer.recursiveDelete(ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running", NodeMissingPolicy.SKIP); } + public static Set<Long> allTransactionsAlive(String type) throws KeeperException, InterruptedException { + final Instance instance = HdfsZooInstance.getInstance(); + final IZooReader reader = ZooReaderWriter.getInstance(); + final Set<Long> result = new HashSet<>(); + final String parent = ZooUtil.getRoot(instance) + "/" + type; + reader.sync(parent); + List<String> children = reader.getChildren(parent); + for (String child : children) { + if (child.endsWith("-running")) { + continue; + } + result.add(Long.parseLong(child)); + } + return result; + } + @Override public boolean transactionComplete(String type, long tid) throws Exception { String path = ZooUtil.getRoot(instance) + "/" + type + "/" + tid + "-running"; http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 6f2c9a2..7154732 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -223,6 +223,7 @@ import org.apache.accumulo.tserver.session.ScanSession; import org.apache.accumulo.tserver.session.Session; import org.apache.accumulo.tserver.session.SessionManager; import org.apache.accumulo.tserver.session.UpdateSession; +import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner; import org.apache.accumulo.tserver.tablet.CommitSession; import org.apache.accumulo.tserver.tablet.CompactionInfo; import org.apache.accumulo.tserver.tablet.CompactionWatcher; @@ -2423,6 +2424,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable { }; SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000); + final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000; + SimpleTimer.getInstance(aconf).schedule(new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS); + HostAndPort masterHost; while (!serverStopRequested) { // send all of the pending messages http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java new file mode 100644 index 0000000..fff2be2 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BulkImportCacheCleaner implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(BulkImportCacheCleaner.class); + private final TabletServer server; + + public BulkImportCacheCleaner(TabletServer server) { + this.server = server; + } + + @Override + public void run() { + // gather the list of transactions the tablets have cached + final Set<Long> tids = new HashSet<>(); + for (Tablet tablet : server.getOnlineTablets()) { + tids.addAll(tablet.getBulkIngestedFiles().keySet()); + } + try { + // get the current transactions from ZooKeeper + final Set<Long> allTransactionsAlive = ZooArbitrator.allTransactionsAlive(Constants.BULK_ARBITRATOR_TYPE); + // remove any that are still alive + tids.removeAll(allTransactionsAlive); + // cleanup any memory of these transactions + for (Tablet tablet : server.getOnlineTablets()) { + tablet.cleanupBulkLoadedFiles(tids); + } + } catch (KeeperException | InterruptedException e) { + // we'll just clean it up again later + log.debug("Error reading bulk import live transactions {}", e.toString()); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 5de3236..7eb2069 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -41,7 +41,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -253,7 +253,7 @@ public class Tablet implements TabletCommitter { private final ConfigurationObserver configObserver; - private final Cache<Long,List<FileRef>> bulkImported = CacheBuilder.newBuilder().expireAfterAccess(4, TimeUnit.HOURS).build(); + private final Cache<Long,List<FileRef>> bulkImported = CacheBuilder.newBuilder().build(); private final int logId; @@ -586,7 +586,7 @@ public class Tablet implements TabletCommitter { // Force a load of any per-table properties configObserver.propertiesChanged(); for (Long key : bulkImported.keys()) { - this.bulkImported.put(key, new ArrayList<FileRef>(bulkImported.get(key))); + this.bulkImported.put(key, new CopyOnWriteArrayList<FileRef>(bulkImported.get(key))); } if (!logEntries.isEmpty()) { @@ -2792,4 +2792,14 @@ public class Tablet implements TabletCommitter { } } + public Map<Long, List<FileRef>> getBulkIngestedFiles() { + return new HashMap<Long, List<FileRef>>(bulkImported.asMap()); + } + + public void cleanupBulkLoadedFiles(Set<Long> tids) { + for (Long tid : tids) { + bulkImported.invalidate(tid); + } + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/8efc9546/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java b/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java index 05c907c..5f670cc 100644 --- a/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java +++ b/test/src/test/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java @@ -68,7 +68,7 @@ public class FastBulkImportIT extends ConfigurableMacIT { } c.tableOperations().addSplits(tableName, splits); - log.info("Creating bulk import files"); + log.info("Creating lots of bulk import files"); FileSystem fs = getCluster().getFileSystem(); Path basePath = getCluster().getTemporaryPath(); CachedConfiguration.setInstance(fs.getConf());