Merge branch 'cassandra-1.2' into trunk Conflicts: src/java/org/apache/cassandra/db/HintedHandOffManager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/076a0e4e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/076a0e4e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/076a0e4e Branch: refs/heads/trunk Commit: 076a0e4e78793a11b5be78e3eb592514b7fc0d79 Parents: 08721da d224c2b Author: Aleksey Yeschenko <alek...@apache.org> Authored: Wed Mar 20 20:09:31 2013 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Wed Mar 20 20:09:31 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/HintedHandOffManager.java | 178 ++++++++------- 2 files changed, 97 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/076a0e4e/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 681fb44,3eb85d6..3309ab9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -22,9 -2,9 +22,10 @@@ * Fix mixing prepared statements between keyspaces (CASSANDRA-5352) * Fix consistency level during bootstrap - strike 3 (CASSANDRA-5354) * Fix transposed arguments in AlreadyExistsException (CASSANDRA-5362) + * Improve asynchronous hint delivery (CASSANDRA-5179) + 1.2.3 * add check for sstable overlap within a level on startup (CASSANDRA-5327) * replace ipv6 colons in jmx object names (CASSANDRA-5298, 5328) http://git-wip-us.apache.org/repos/asf/cassandra/blob/076a0e4e/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java index db85efb,53411f5..cb306ae --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@@ -57,8 -58,7 +59,6 @@@ import org.apache.cassandra.metrics.Hin import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.*; --import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; @@@ -108,31 -107,11 +107,34 @@@ public class HintedHandOffManager imple Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), "internal"); + new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), + "internal"); + + private final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF); + /** + * Returns a mutation representing a Hint to be sent to <code>targetId</code> + * as soon as it becomes available again. + */ + public static RowMutation hintFor(RowMutation mutation, UUID targetId) throws IOException + { + UUID hintId = UUIDGen.getTimeUUID(); + + // The hint TTL is set at the smallest GCGraceSeconds for any of the CFs in the RM; + // this ensures that deletes aren't "undone" by delivery of an old hint + int ttl = Integer.MAX_VALUE; + for (ColumnFamily cf : mutation.getColumnFamilies()) + ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds()); + + // serialize the hint with id and version as a composite column name + ByteBuffer name = comparator.decompose(hintId, MessagingService.current_version); + ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer, MessagingService.current_version)); + ColumnFamily cf = ColumnFamily.create(Schema.instance.getCFMetaData(Table.SYSTEM_KS, SystemTable.HINTS_CF)); + cf.addColumn(name, value, System.currentTimeMillis(), ttl); + + return new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf); + } + public void start() { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); @@@ -157,10 -136,10 +159,10 @@@ StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES); } - private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) throws IOException + private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) { RowMutation rm = new RowMutation(Table.SYSTEM_KS, tokenBytes); - rm.delete(new QueryPath(SystemTable.HINTS_CF, null, columnName), timestamp); + rm.delete(SystemTable.HINTS_CF, columnName, timestamp); rm.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery } @@@ -339,37 -302,40 +325,40 @@@ int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB(); RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); + delivery: while (true) { - // check if hints delivery has been paused during the process - if (hintedHandOffPaused) - { - logger.debug("Hints delivery process is paused, aborting"); - break; - } + QueryFilter filter = QueryFilter.getSliceFilter(epkey, - new QueryPath(SystemTable.HINTS_CF), ++ SystemTable.HINTS_CF, + startColumn, + ByteBufferUtil.EMPTY_BYTE_BUFFER, + false, + pageSize); - QueryFilter filter = QueryFilter.getSliceFilter(epkey, SystemTable.HINTS_CF, startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize); + ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), + (int) (System.currentTimeMillis() / 1000)); - ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int)(System.currentTimeMillis() / 1000)); if (pagingFinished(hintsPage, startColumn)) - { - if (ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(startColumn)) - { - // we've started from the beginning and could not find anything (only maybe tombstones) - break; - } - else - { - // restart query from the first column until we read an empty row; - // that will tell us everything was delivered successfully with no timeouts - startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; - continue; - } + break; + // check if node is still alive and we should continue delivery process + if (!FailureDetector.instance.isAlive(endpoint)) + { + logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", endpoint, rowsReplayed); + return; } + List<WriteResponseHandler> responseHandlers = Lists.newArrayList(); + - for (final IColumn hint : hintsPage.getSortedColumns()) + for (final Column hint : hintsPage.getSortedColumns()) { + // check if hints delivery has been paused during the process + if (hintedHandOffPaused) + { + logger.debug("Hints delivery process is paused, aborting"); + break delivery; + } + // Skip tombstones: // if we iterate quickly enough, it's possible that we could request a new page in the same millisecond // in which the local deletion timestamp was generated on the last column in the old page, in which