Author: jbellis Date: Thu Nov 24 20:21:04 2011 New Revision: 1205971 URL: http://svn.apache.org/viewvc?rev=1205971&view=rev Log: merge #3440 from 1.0
Modified: cassandra/trunk/ (props changed) cassandra/trunk/CHANGES.txt cassandra/trunk/contrib/ (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Propchange: cassandra/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 24 20:21:04 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0:1125021-1130369 /cassandra/branches/cassandra-0.8.1:1101014-1125018 -/cassandra/branches/cassandra-1.0:1167085-1205476 +/cassandra/branches/cassandra-1.0:1167085-1205970 /cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020 Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1205971&r1=1205970&r2=1205971&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Thu Nov 24 20:21:04 2011 @@ -12,6 +12,9 @@ 1.0.4 + * fix self-hinting of timed out read repair updates and make hinted handoff + less prone to OOMing a coordinator (CASSANDRA-3440) + * expose bloom filter sizes via JMX (CASSANDRA-3495) * enforce RP tokens 0..2**127 (CASSANDRA-3501) * canonicalize paths exposed through JMX (CASSANDRA-3504) * fix "liveSize" stat when sstables are removed (CASSANDRA-3496) @@ -23,7 +26,6 @@ Merged from 0.8: * fix concurrence issue in the FailureDetector (CASSANDRA-3519) * fix array out of bounds error in counter shard removal (CASSANDRA-3514) -Merged from 0.8: * avoid dropping tombstones when they might still be needed to shadow data in a different sstable (CASSANDRA-2786) Propchange: cassandra/trunk/contrib/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 24 20:21:04 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018 -/cassandra/branches/cassandra-1.0/contrib:1167085-1205476 +/cassandra/branches/cassandra-1.0/contrib:1167085-1205970 /cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 24 20:21:04 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018 -/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1205476 +/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1205970 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 24 20:21:04 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018 -/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1205476 +/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1205970 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 24 20:21:04 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018 -/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1205476 +/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1205970 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 24 20:21:04 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018 -/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1205476 +/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1205970 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Nov 24 20:21:04 2011 @@ -4,7 +4,7 @@ /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1198724,1198726-1205453 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018 -/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1205476 +/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1205970 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1205971&r1=1205970&r2=1205971&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Nov 24 20:21:04 2011 @@ -1647,7 +1647,13 @@ public class ColumnFamilyStore implement return data.getRecentBloomFilterFalseRatio(); } - + public long getBloomFilterDiskSpaceUsed() + { + long total = 0; + for (SSTableReader sst : getSSTables()) + total += sst.getBloomFilterSerializedSize(); + return total; + } @Override public String toString() Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1205971&r1=1205970&r2=1205971&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Thu Nov 24 20:21:04 2011 @@ -184,6 +184,8 @@ public interface ColumnFamilyStoreMBean public double getRecentBloomFilterFalseRatio(); + public long getBloomFilterDiskSpaceUsed(); + /** * Gets the minimum number of sstables in queue before compaction kicks off */ Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1205971&r1=1205970&r2=1205971&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Nov 24 20:21:04 2011 @@ -263,10 +263,10 @@ public class RowMutation implements IMut public Message getMessage(Integer version) throws IOException { - return makeRowMutationMessage(StorageService.Verb.MUTATION, version); + return getMessage(StorageService.Verb.MUTATION, version); } - public Message makeRowMutationMessage(StorageService.Verb verb, int version) throws IOException + public Message getMessage(StorageService.Verb verb, int version) throws IOException { return new Message(FBUtilities.getBroadcastAddress(), verb, getSerializedBuffer(version), version); } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1205971&r1=1205970&r2=1205971&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Nov 24 20:21:04 2011 @@ -431,6 +431,14 @@ public class SSTableReader extends SSTab return bf; } + public long getBloomFilterSerializedSize() + { + if (descriptor.usesOldBloomFilter) + return LegacyBloomFilter.serializer().serializedSize((LegacyBloomFilter) bf); + else + return BloomFilter.serializer().serializedSize((BloomFilter) bf); + } + /** * @return An estimate of the number of keys in this SSTable. */ Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1205971&r1=1205970&r2=1205971&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu Nov 24 20:21:04 2011 @@ -27,7 +27,6 @@ import java.nio.channels.AsynchronousClo import java.nio.channels.ServerSocketChannel; import java.util.*; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -36,7 +35,6 @@ import javax.management.ObjectName; import com.google.common.base.Function; import com.google.common.collect.Lists; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -170,9 +168,16 @@ public final class MessagingService impl if (expiredCallbackInfo.shouldHint()) { - // Trigger hints for expired mutation message. assert expiredCallbackInfo.message != null; - scheduleMutationHint(expiredCallbackInfo.message, expiredCallbackInfo.target); + try + { + RowMutation rm = RowMutation.fromBytes(expiredCallbackInfo.message.getMessageBody(), expiredCallbackInfo.message.getVersion()); + return StorageProxy.scheduleLocalHint(rm, expiredCallbackInfo.target, null, null); + } + catch (IOException e) + { + logger_.error("Unable to deserialize mutation when writting hint for: " + expiredCallbackInfo.target); + } } return null; @@ -192,21 +197,6 @@ public final class MessagingService impl } } - - private Future<?> scheduleMutationHint(Message mutationMessage, InetAddress mutationTarget) - { - try - { - RowMutation rm = RowMutation.fromBytes(mutationMessage.getMessageBody(), mutationMessage.getVersion()); - return StorageProxy.scheduleLocalHint(rm, mutationTarget, null, null); - } - catch (IOException e) - { - logger_.error("Unable to deserialize mutation when writting hint for: " + mutationTarget); - } - return null; - } - /** * Track latency information for the dynamic snitch * @param cb the callback associated with this message -- this lets us know if it's a message type we're interested in Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1205971&r1=1205970&r2=1205971&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Thu Nov 24 20:21:04 2011 @@ -124,7 +124,10 @@ public class RowRepairResolver extends A Message repairMessage; try { - repairMessage = rowMutation.getMessage(Gossiper.instance.getVersion(endpoints.get(i))); + // use a separate verb here because we don't want these to be get the white glove hint- + // on-timeout behavior that a "real" mutation gets + repairMessage = rowMutation.getMessage(StorageService.Verb.READ_REPAIR, + Gossiper.instance.getVersion(endpoints.get(i))); } catch (IOException e) { Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1205971&r1=1205970&r2=1205971&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Nov 24 20:21:04 2011 @@ -30,12 +30,11 @@ import java.util.concurrent.atomic.Atomi import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.base.Function; import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; +import com.google.common.collect.MapMaker; import com.google.common.collect.Multimap; - -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.net.*; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -45,6 +44,7 @@ import org.apache.cassandra.concurrent.C import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.dht.AbstractBounds; @@ -59,10 +59,7 @@ import org.apache.cassandra.locator.IEnd import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.*; import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.LatencyTracker; -import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.*; public class StorageProxy implements StorageProxyMBean @@ -86,7 +83,14 @@ public class StorageProxy implements Sto private static volatile boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled(); private static volatile int maxHintWindow = DatabaseDescriptor.getMaxHintWindow(); private static volatile int maxHintsInProgress = 1024 * Runtime.getRuntime().availableProcessors(); - private static final AtomicInteger hintsInProgress = new AtomicInteger(); + private static final AtomicInteger totalHintsInProgress = new AtomicInteger(); + private static final Map<InetAddress, AtomicInteger> hintsInProgress = new MapMaker().concurrencyLevel(1).makeComputingMap(new Function<InetAddress, AtomicInteger>() + { + public AtomicInteger apply(InetAddress inetAddress) + { + return new AtomicInteger(0); + } + }); private static final AtomicLong totalHints = new AtomicLong(); private StorageProxy() {} @@ -286,10 +290,19 @@ public class StorageProxy implements Sto for (InetAddress destination : targets) { - if (FailureDetector.instance.isAlive(destination)) + // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can + // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead. + // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to + // a small number of nodes causing problems, so we should avoid shutting down writes completely to + // healthy nodes. Any node with no hintsInProgress is considered healthy. + if (totalHintsInProgress.get() > maxHintsInProgress + && (hintsInProgress.get(destination).get() > 0 && shouldHint(destination))) { - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); + throw new TimeoutException(); + } + if (FailureDetector.instance.isAlive(destination)) + { if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) { insertLocal(rm, responseHandler); @@ -300,6 +313,7 @@ public class StorageProxy implements Sto if (logger.isDebugEnabled()) logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination); + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); Multimap<Message, InetAddress> messages = dcMessages.get(dc); if (messages == null) { @@ -315,11 +329,6 @@ public class StorageProxy implements Sto if (!shouldHint(destination)) continue; - // Avoid OOMing from hints waiting to be written. (Unlike ordinary mutations, hint - // not eligible to drop if we fall behind.) - if (hintsInProgress.get() > maxHintsInProgress) - throw new TimeoutException(); - // Schedule a local hint and let the handler know it needs to wait for the hint to complete too Future<Void> hintfuture = scheduleLocalHint(rm, destination, responseHandler, consistency_level); responseHandler.addFutureForHint(new CreationTimeAwareFuture<Void>(hintfuture)); @@ -337,12 +346,13 @@ public class StorageProxy implements Sto { // Hint of itself doesn't make sense. assert !target.equals(FBUtilities.getBroadcastAddress()) : target; - hintsInProgress.incrementAndGet(); + totalHintsInProgress.incrementAndGet(); + final AtomicInteger targetHints = hintsInProgress.get(target); + targetHints.incrementAndGet(); - Runnable runnable = new Runnable() + Runnable runnable = new WrappedRunnable() { - - public void run() + public void runMayThrow() throws IOException { if (logger.isDebugEnabled()) logger.debug("Adding hint for " + target); @@ -360,14 +370,10 @@ public class StorageProxy implements Sto if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY) responseHandler.response(null); } - catch (IOException e) - { - throw new RuntimeException(e); - } finally { - // Decrement the current hint in the execution after the task is done. - hintsInProgress.decrementAndGet(); + totalHintsInProgress.decrementAndGet(); + targetHints.decrementAndGet(); } } }; @@ -730,6 +736,8 @@ public class StorageProxy implements Sto { ReadCommand command = repairCommands.get(i); RepairCallback handler = repairResponseHandlers.get(i); + // wait for the repair writes to be acknowledged, to minimize impact on any replica that's + // behind on writes in case the out-of-sync row is read multiple times in quick succession FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout()); Row row; @@ -1279,7 +1287,7 @@ public class StorageProxy implements Sto public int getHintsInProgress() { - return hintsInProgress.get(); + return totalHintsInProgress.get(); } public void verifyNoHintsInProgress() Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1205971&r1=1205970&r2=1205971&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Thu Nov 24 20:21:04 2011 @@ -464,6 +464,7 @@ public class NodeCmd outs.println("\t\tPending Tasks: " + cfstore.getPendingTasks()); outs.println("\t\tBloom Filter False Postives: " + cfstore.getBloomFilterFalsePositives()); outs.println("\t\tBloom Filter False Ratio: " + String.format("%01.5f", cfstore.getRecentBloomFilterFalseRatio())); + outs.println("\t\tBloom Filter Space Used: " + cfstore.getBloomFilterDiskSpaceUsed()); InstrumentingCacheMBean keyCacheMBean = probe.getKeyCacheMBean(tableName, cfstore.getColumnFamilyName()); if (keyCacheMBean.getCapacity() > 0)