This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 40d954f22b22ff437b938c6bf3607298b30e1088 Merge: d6dfee4 83429b6 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Jul 25 14:36:59 2019 -0400 Merge branch '1.9' into 2.0 .../org/apache/accumulo/core/conf/Property.java | 5 + .../apache/accumulo/tserver/BusiestTracker.java | 92 +++++++++++ .../org/apache/accumulo/tserver/TabletServer.java | 35 +++++ .../org/apache/accumulo/tserver/tablet/Tablet.java | 4 + .../accumulo/tserver/BusiestTrackerTest.java | 168 +++++++++++++++++++++ 5 files changed, 304 insertions(+) diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index 7879ae4,494f6b2..302ce06 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -444,8 -483,13 +444,13 @@@ public enum Property "The minimum number of threads to use to handle incoming requests."), TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."), - TSERV_MAX_MESSAGE_SIZE("tserver.server.message.size.max", "1G", PropertyType.MEMORY, + TSERV_MAX_MESSAGE_SIZE("tserver.server.message.size.max", "1G", PropertyType.BYTES, "The maximum size of a message that can be sent to a tablet server."), + TSERV_LOG_BUSY_TABLETS_COUNT("tserver.log.busy.tablets.count", "0", PropertyType.COUNT, + "Number of busiest tablets to log. Logged at interval controlled by " + + "tserver.log.busy.tablets.interval. If <= 0, logging of busy tablets is disabled"), + TSERV_LOG_BUSY_TABLETS_INTERVAL("tserver.log.busy.tablets.interval", "1h", + PropertyType.TIMEDURATION, "Time interval between logging out busy tablets information."), TSERV_HOLD_TIME_SUICIDE("tserver.hold.time.max", "5m", PropertyType.TIMEDURATION, "The maximum time for a tablet server to be in the \"memory full\" state." + " If the tablet server cannot write out memory in this much time, it will" diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/BusiestTracker.java index 0000000,ea60cad..1eaa1d6 mode 000000,100644..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BusiestTracker.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BusiestTracker.java @@@ -1,0 -1,96 +1,92 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.tserver; + + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + -import org.apache.accumulo.core.data.impl.KeyExtent; ++import org.apache.accumulo.core.dataImpl.KeyExtent; + import org.apache.accumulo.core.util.ComparablePair; + import org.apache.accumulo.tserver.tablet.Tablet; + + import com.google.common.collect.Ordering; + + /** + * Computes the N tablets that have the highest deltas for a given monotonically increasing counter. + */ + public abstract class BusiestTracker { + + private Map<KeyExtent,Long> lastCounts = Collections.emptyMap(); + private final int numBusiestTabletsToLog; + + BusiestTracker(int numBusiestTabletsToLog) { + this.numBusiestTabletsToLog = numBusiestTabletsToLog; + } + + protected abstract long extractCount(Tablet tablet); + + public List<ComparablePair<Long,KeyExtent>> computeBusiest(Collection<Tablet> tablets) { + + HashMap<KeyExtent,Long> counts = new HashMap<>(); + + ArrayList<ComparablePair<Long,KeyExtent>> tabletsWithDelta = new ArrayList<>(); + + for (Tablet tablet : tablets) { + KeyExtent extent = tablet.getExtent(); + + // only get the count once to ensure consistency in the case of multiple threads + long count = extractCount(tablet); + + if (count == 0) + continue; + + counts.put(extent, count); + - Long lastCount = lastCounts.get(extent); - if (lastCount == null) - lastCount = 0L; - - long delta = count - lastCount; ++ long delta = count - lastCounts.getOrDefault(extent, 0L); + + // handle case where tablet leaves tserver and returns OR tablet had no activity + if (delta > 0) + tabletsWithDelta.add(new ComparablePair<>(delta, extent)); + } + + lastCounts = counts; + + return Ordering.natural().greatestOf(tabletsWithDelta, numBusiestTabletsToLog); + } + + static BusiestTracker newBusiestIngestTracker(int numBusiestTabletsToLog) { + return new BusiestTracker(numBusiestTabletsToLog) { + @Override + protected long extractCount(Tablet tablet) { + return tablet.totalIngest(); + } + }; + } + + static BusiestTracker newBusiestQueryTracker(int numBusiestTabletsToLog) { + return new BusiestTracker(numBusiestTabletsToLog) { + @Override + protected long extractCount(Tablet tablet) { + return tablet.totalQueries(); + } + }; + } + } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 1ddec82,73bc58f..d349b7b --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -26,10 -26,12 +26,11 @@@ import java.io.IOException import java.lang.management.ManagementFactory; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; + import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@@ -145,10 -135,14 +146,11 @@@ import org.apache.accumulo.core.tablets import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; -import org.apache.accumulo.core.trace.DistributedTrace; -import org.apache.accumulo.core.trace.Span; -import org.apache.accumulo.core.trace.Trace; +import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.ByteBufferUtil; -import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.ColumnFQ; + import org.apache.accumulo.core.util.ComparablePair; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.MapCounter; @@@ -348,34 -347,67 +350,67 @@@ public class TabletServer extends Abstr private final ZooAuthenticationKeyWatcher authKeyWatcher; private final WalStateManager walMarker; - public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) throws IOException { - super(confFactory); - this.confFactory = confFactory; - this.fs = fs; + public static void main(String[] args) throws Exception { + try (TabletServer tserver = new TabletServer(new ServerOpts(), args)) { + tserver.runServer(); + } + } + + TabletServer(ServerOpts opts, String[] args) { + super("tserver", opts, args); + ServerContext context = super.getContext(); + context.setupCrypto(); + this.masterLockCache = new ZooCache(context.getZooReaderWriter(), null); + this.watcher = new TransactionWatcher(context); + this.confFactory = context.getServerConfFactory(); + this.fs = context.getVolumeManager(); final AccumuloConfiguration aconf = getConfiguration(); - Instance instance = getInstance(); log.info("Version " + Constants.VERSION); - log.info("Instance " + instance.getInstanceID()); + log.info("Instance " + getInstanceID()); this.sessionManager = new SessionManager(aconf); - this.logSorter = new LogSorter(instance, fs, aconf); - this.replWorker = new ReplicationWorker(this, fs); + this.logSorter = new LogSorter(context, fs, aconf); + this.replWorker = new ReplicationWorker(context, fs); this.statsKeeper = new TabletStatsKeeper(); + final int numBusyTabletsToLog = aconf.getCount(Property.TSERV_LOG_BUSY_TABLETS_COUNT); + final long logBusyTabletsDelay = + aconf.getTimeInMillis(Property.TSERV_LOG_BUSY_TABLETS_INTERVAL); + + // This thread will calculate and log out the busiest tablets based on ingest count and + // query count every #{logBusiestTabletsDelay} + if (numBusyTabletsToLog > 0) { + SimpleTimer.getInstance(aconf).schedule(new Runnable() { + private BusiestTracker ingestTracker = + BusiestTracker.newBusiestIngestTracker(numBusyTabletsToLog); + private BusiestTracker queryTracker = + BusiestTracker.newBusiestQueryTracker(numBusyTabletsToLog); + + @Override + public void run() { - - List<Tablet> tablets; - synchronized (onlineTablets) { - tablets = new ArrayList<>(onlineTablets.values()); - } - ++ Collection<Tablet> tablets = onlineTablets.snapshot().values(); + logBusyTablets(ingestTracker.computeBusiest(tablets), "ingest count"); + logBusyTablets(queryTracker.computeBusiest(tablets), "query count"); + } + + private void logBusyTablets(List<ComparablePair<Long,KeyExtent>> busyTablets, + String label) { + + int i = 1; + for (Pair<Long,KeyExtent> pair : busyTablets) { + log.debug("{} busiest tablet by {}: {} -- extent: {} ", i, label.toLowerCase(), + pair.getFirst(), pair.getSecond()); + i++; + } + } + }, logBusyTabletsDelay, logBusyTabletsDelay); + } + - SimpleTimer.getInstance(aconf).schedule(new Runnable() { - @Override - public void run() { - synchronized (onlineTablets) { - long now = System.currentTimeMillis(); - for (Tablet tablet : onlineTablets.values()) - try { - tablet.updateRates(now); - } catch (Exception ex) { - log.error("Error updating rates for {}", tablet.getExtent(), ex); - } + SimpleTimer.getInstance(aconf).schedule(() -> { + long now = System.currentTimeMillis(); + for (Tablet tablet : getOnlineTablets().values()) { + try { + tablet.updateRates(now); + } catch (Exception ex) { + log.error("Error updating rates for {}", tablet.getExtent(), ex); } } }, 5000, 5000); diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/BusiestTrackerTest.java index 0000000,8e63dfe..40bc38e mode 000000,100644..100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/BusiestTrackerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/BusiestTrackerTest.java @@@ -1,0 -1,166 +1,168 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.tserver; + + import static org.easymock.EasyMock.createMock; + import static org.easymock.EasyMock.expect; + import static org.easymock.EasyMock.replay; + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertTrue; + + import java.util.ArrayList; + import java.util.Collection; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + -import org.apache.accumulo.core.data.impl.KeyExtent; ++import org.apache.accumulo.core.data.TableId; ++import org.apache.accumulo.core.dataImpl.KeyExtent; + import org.apache.accumulo.core.util.ComparablePair; + import org.apache.accumulo.tserver.tablet.Tablet; + import org.junit.Test; + + import com.google.common.base.Preconditions; + + public class BusiestTrackerTest { + + private static BusiestTracker newTestTracker(int numToLog) { + return new BusiestTracker(3) { + + @Override + protected long extractCount(Tablet tablet) { + return tablet.totalIngest(); + } + }; + } + + private static Collection<Tablet> createTablets(Object... testData) { + Preconditions.checkArgument(testData.length % 2 == 0); + + List<Tablet> data = new ArrayList<>(); + + for (int i = 0; i < testData.length; i += 2) { + String tableId = (String) testData[i]; + long count = (Long) testData[i + 1]; + + Tablet tablet = createMock(Tablet.class); + - expect(tablet.getExtent()).andReturn(new KeyExtent(tableId, null, null)).anyTimes(); ++ expect(tablet.getExtent()).andReturn(new KeyExtent(TableId.of(tableId), null, null)) ++ .anyTimes(); + expect(tablet.totalIngest()).andReturn(count).anyTimes(); + replay(tablet); + + data.add(tablet); + } + + return data; + } + + private static ComparablePair<Long,KeyExtent> newPair(String tableId, long count) { - return new ComparablePair<>(count, new KeyExtent(tableId, null, null)); ++ return new ComparablePair<>(count, new KeyExtent(TableId.of(tableId), null, null)); + } + + private static Set<ComparablePair<Long,KeyExtent>> createExpected(Object... testData) { + Preconditions.checkArgument(testData.length % 2 == 0); + + Set<ComparablePair<Long,KeyExtent>> expected = new HashSet<>(); + + for (int i = 0; i < testData.length; i += 2) { + String tableId = (String) testData[i]; + long count = (Long) testData[i + 1]; + + expected.add(newPair(tableId, count)); + } + + return expected; + } + + @Test + public void testNoChange() { + BusiestTracker tracker = newTestTracker(3); + + Collection<Tablet> data1 = createTablets("e1", 5L, "e2", 0L); + Collection<ComparablePair<Long,KeyExtent>> busy1 = tracker.computeBusiest(data1); + assertEquals(createExpected("e1", 5L), new HashSet<>(busy1)); + + Collection<ComparablePair<Long,KeyExtent>> busy2 = tracker.computeBusiest(data1); + assertTrue(busy2.isEmpty()); + + Collection<Tablet> data2 = createTablets("e1", 5L, "e2", 2L, "e3", 3L, "e4", 6L); + Collection<ComparablePair<Long,KeyExtent>> busy3 = tracker.computeBusiest(data2); + assertEquals(createExpected("e2", 2L, "e3", 3L, "e4", 6L), new HashSet<>(busy3)); + + Collection<ComparablePair<Long,KeyExtent>> busy4 = tracker.computeBusiest(data2); + assertTrue(busy4.isEmpty()); + } + + @Test + public void testTruncate() { + BusiestTracker tracker = newTestTracker(3); + + Collection<Tablet> data1 = createTablets("e1", 5L, "e2", 6L, "e3", 7L, "e4", 8L, "e5", 8L); + Collection<ComparablePair<Long,KeyExtent>> busy1 = tracker.computeBusiest(data1); + assertEquals(createExpected("e3", 7L, "e4", 8L, "e5", 8L), new HashSet<>(busy1)); + + Collection<Tablet> data2 = + createTablets("e1", 100L, "e2", 100L, "e3", 100L, "e4", 100L, "e5", 100L); + Collection<ComparablePair<Long,KeyExtent>> busy2 = tracker.computeBusiest(data2); + assertEquals(createExpected("e1", 100L - 5L, "e2", 100L - 6L, "e3", 100L - 7L), + new HashSet<>(busy2)); + + Collection<Tablet> data3 = + createTablets("e1", 112L, "e2", 110L, "e3", 111L, "e4", 112L, "e5", 110L, "e6", 50L); + Collection<ComparablePair<Long,KeyExtent>> busy3 = tracker.computeBusiest(data3); + assertEquals(createExpected("e1", 112L - 100L, "e4", 112L - 100L, "e6", 50L), + new HashSet<>(busy3)); + } + + @Test + public void testReload() { + BusiestTracker tracker = newTestTracker(3); + + Collection<Tablet> data1 = createTablets("e1", 5L, "e2", 3L); + Collection<ComparablePair<Long,KeyExtent>> busy1 = tracker.computeBusiest(data1); + assertEquals(createExpected("e1", 5L, "e2", 3L), new HashSet<>(busy1)); + + // when count is less than previously seen tablet should be ignored + Collection<Tablet> data2 = createTablets("e1", 7L, "e2", 1L); + Collection<ComparablePair<Long,KeyExtent>> busy2 = tracker.computeBusiest(data2); + assertEquals(createExpected("e1", 2L), new HashSet<>(busy2)); + + Collection<Tablet> data3 = createTablets("e1", 8L, "e2", 4L); + Collection<ComparablePair<Long,KeyExtent>> busy3 = tracker.computeBusiest(data3); + assertEquals(createExpected("e1", 1L, "e2", 3L), new HashSet<>(busy3)); + } + + @Test + public void testOrder() { + BusiestTracker tracker = newTestTracker(3); + + Collection<Tablet> data1 = createTablets("e1", 5L, "e2", 6L, "e3", 13L, "e4", 8L, "e5", 9L); + List<ComparablePair<Long,KeyExtent>> busy1 = tracker.computeBusiest(data1); + assertEquals(3, busy1.size()); + assertEquals(newPair("e3", 13L), busy1.get(0)); + assertEquals(newPair("e5", 9L), busy1.get(1)); + assertEquals(newPair("e4", 8L), busy1.get(2)); + + Collection<Tablet> data2 = createTablets("e1", 15L, "e2", 17L, "e3", 13L, "e4", 20L, "e5", 9L); + List<ComparablePair<Long,KeyExtent>> busy2 = tracker.computeBusiest(data2); + assertEquals(3, busy2.size()); + assertEquals(newPair("e4", 12L), busy2.get(0)); + assertEquals(newPair("e2", 11L), busy2.get(1)); + assertEquals(newPair("e1", 10L), busy2.get(2)); + } + }