Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt src/java/org/apache/cassandra/db/BatchlogManager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/440824c1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/440824c1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/440824c1 Branch: refs/heads/trunk Commit: 440824c1a60a344bc3e8a5ad35ae2fac879bd61d Parents: 014d328 e916dff Author: Aleksey Yeschenko <alek...@apache.org> Authored: Fri Oct 17 03:40:17 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Oct 17 03:40:17 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/BatchlogManager.java | 64 ++++++++++---------- .../cassandra/service/StorageService.java | 25 ++++++-- .../cassandra/db/BatchlogManagerTest.java | 8 +-- 4 files changed, 57 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index b40e14b,73aaab0..d7a8904 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,89 -1,5 +1,90 @@@ -2.0.11: +2.1.1 + * Fix IllegalArgumentException when a list of IN values containing tuples + is passed as a single arg to a prepared statement with the v1 or v2 + protocol (CASSANDRA-8062) + * Fix ClassCastException in DISTINCT query on static columns with + query paging (CASSANDRA-8108) + * Fix NPE on null nested UDT inside a set (CASSANDRA-8105) + * Fix exception when querying secondary index on set items or map keys + when some clustering columns are specified (CASSANDRA-8073) + * Send proper error response when there is an error during native + protocol message decode (CASSANDRA-8118) + * Gossip should ignore generation numbers too far in the future (CASSANDRA-8113) + * Fix NPE when creating a table with frozen sets, lists (CASSANDRA-8104) + * Fix high memory use due to tracking reads on incrementally opened sstable + readers (CASSANDRA-8066) + * Fix EXECUTE request with skipMetadata=false returning no metadata + (CASSANDRA-8054) + * Allow concurrent use of CQLBulkOutputFormat (CASSANDRA-7776) + * Shutdown JVM on OOM (CASSANDRA-7507) + * Upgrade netty version and enable epoll event loop (CASSANDRA-7761) + * Don't duplicate sstables smaller than split size when using + the sstablesplitter tool (CASSANDRA-7616) + * Avoid re-parsing already prepared statements (CASSANDRA-7923) + * Fix some Thrift slice deletions and updates of COMPACT STORAGE + tables with some clustering columns omitted (CASSANDRA-7990) + * Fix filtering for CONTAINS on sets (CASSANDRA-8033) + * Properly track added size (CASSANDRA-7239) + * Allow compilation in java 8 (CASSANDRA-7208) + * Fix Assertion error on RangeTombstoneList diff (CASSANDRA-8013) + * Release references to overlapping sstables during compaction (CASSANDRA-7819) + * Send notification when opening compaction results early (CASSANDRA-8034) + * Make native server start block until properly bound (CASSANDRA-7885) + * (cqlsh) Fix IPv6 support (CASSANDRA-7988) + * Ignore fat clients when checking for endpoint collision (CASSANDRA-7939) + * Make sstablerepairedset take a list of files (CASSANDRA-7995) + * (cqlsh) Tab completeion for indexes on map keys (CASSANDRA-7972) + * (cqlsh) Fix UDT field selection in select clause (CASSANDRA-7891) + * Fix resource leak in event of corrupt sstable + * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131) + * Provide visibility into prepared statements churn (CASSANDRA-7921, CASSANDRA-7930) + * Invalidate prepared statements when their keyspace or table is + dropped (CASSANDRA-7566) + * cassandra-stress: fix support for NetworkTopologyStrategy (CASSANDRA-7945) + * Fix saving caches when a table is dropped (CASSANDRA-7784) + * Add better error checking of new stress profile (CASSANDRA-7716) + * Use ThreadLocalRandom and remove FBUtilities.threadLocalRandom (CASSANDRA-7934) + * Prevent operator mistakes due to simultaneous bootstrap (CASSANDRA-7069) + * cassandra-stress supports whitelist mode for node config (CASSANDRA-7658) + * GCInspector more closely tracks GC; cassandra-stress and nodetool report it (CASSANDRA-7916) + * nodetool won't output bogus ownership info without a keyspace (CASSANDRA-7173) + * Add human readable option to nodetool commands (CASSANDRA-5433) + * Don't try to set repairedAt on old sstables (CASSANDRA-7913) + * Add metrics for tracking PreparedStatement use (CASSANDRA-7719) + * (cqlsh) tab-completion for triggers (CASSANDRA-7824) + * (cqlsh) Support for query paging (CASSANDRA-7514) + * (cqlsh) Show progress of COPY operations (CASSANDRA-7789) + * Add syntax to remove multiple elements from a map (CASSANDRA-6599) + * Support non-equals conditions in lightweight transactions (CASSANDRA-6839) + * Add IF [NOT] EXISTS to create/drop triggers (CASSANDRA-7606) + * (cqlsh) Display the current logged-in user (CASSANDRA-7785) + * (cqlsh) Don't ignore CTRL-C during COPY FROM execution (CASSANDRA-7815) + * (cqlsh) Order UDTs according to cross-type dependencies in DESCRIBE + output (CASSANDRA-7659) + * (cqlsh) Fix handling of CAS statement results (CASSANDRA-7671) + * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405) + * Support list index operations with conditions (CASSANDRA-7499) + * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731) + * Validate IPv6 wildcard addresses properly (CASSANDRA-7680) + * (cqlsh) Error when tracing query (CASSANDRA-7613) + * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569) + * SSTableExport uses correct validator to create string representation of partition + keys (CASSANDRA-7498) + * Avoid NPEs when receiving type changes for an unknown keyspace (CASSANDRA-7689) + * Add support for custom 2i validation (CASSANDRA-7575) + * Pig support for hadoop CqlInputFormat (CASSANDRA-6454) + * Add listen_interface and rpc_interface options (CASSANDRA-7417) + * Improve schema merge performance (CASSANDRA-7444) + * Adjust MT depth based on # of partition validating (CASSANDRA-5263) + * Optimise NativeCell comparisons (CASSANDRA-6755) + * Configurable client timeout for cqlsh (CASSANDRA-7516) + * Include snippet of CQL query near syntax error in messages (CASSANDRA-7111) + * Make repair -pr work with -local (CASSANDRA-7450) + * Fix error in sstableloader with -cph > 1 (CASSANDRA-8007) + * Fix snapshot repair error on indexed tables (CASSANDRA-8020) + * Do not exit nodetool repair when receiving JMX NOTIF_LOST (CASSANDRA-7909) +Merged from 2.0: + * Force batchlog replay before decommissioning a node (CASSANDRA-7446) * Fix hint replay with many accumulated expired hints (CASSANDRA-6998) * Fix duplicate results in DISTINCT queries on static columns with query paging (CASSANDRA-8108) http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java index 7f8d355,48f4c3c..18d9a17 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@@ -38,9 -38,11 +37,8 @@@ import org.slf4j.LoggerFactory import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; --import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.marshal.LongType; -import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.WriteTimeoutException; @@@ -117,12 -119,13 +120,13 @@@ public class BatchlogManager implement replayAllFailedBatches(); } }; - batchlogTasks.execute(runnable); + // If a replay is already in progress this request will be executed after it completes. + return batchlogTasks.submit(runnable); } - public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid) + public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version) { - return getBatchlogMutationFor(mutations, uuid, FBUtilities.timestampMicros()); + return getBatchlogMutationFor(mutations, uuid, version, FBUtilities.timestampMicros()); } @VisibleForTesting @@@ -151,15 -158,11 +155,11 @@@ throw new AssertionError(); // cannot happen. } - return ByteBuffer.wrap(bos.toByteArray()); + return buf.asByteBuffer(); } - @VisibleForTesting - void replayAllFailedBatches() throws ExecutionException, InterruptedException + private void replayAllFailedBatches() throws ExecutionException, InterruptedException { - if (!isReplaying.compareAndSet(false, true)) - return; - logger.debug("Started replayAllFailedBatches"); // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). @@@ -167,34 -170,27 +167,27 @@@ int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size(); RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); - try - { - UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", - Keyspace.SYSTEM_KS, - SystemKeyspace.BATCHLOG_CF, - PAGE_SIZE)); - - while (!page.isEmpty()) - { - UUID id = processBatchlogPage(page, rateLimiter); - UntypedResultSet page = process("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", - Keyspace.SYSTEM_KS, - SystemKeyspace.BATCHLOG_CF, - PAGE_SIZE); ++ UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", ++ Keyspace.SYSTEM_KS, ++ SystemKeyspace.BATCHLOG_CF, ++ PAGE_SIZE)); - if (page.size() < PAGE_SIZE) - break; // we've exhausted the batchlog, next query would be empty. + while (!page.isEmpty()) + { + UUID id = processBatchlogPage(page, rateLimiter); - page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d", - Keyspace.SYSTEM_KS, - SystemKeyspace.BATCHLOG_CF, - PAGE_SIZE), - id); - } + if (page.size() < PAGE_SIZE) + break; // we've exhausted the batchlog, next query would be empty. - cleanup(); - } - finally - { - isReplaying.set(false); - page = process("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(%s) LIMIT %d", - Keyspace.SYSTEM_KS, - SystemKeyspace.BATCHLOG_CF, - id, - PAGE_SIZE); ++ page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d", ++ Keyspace.SYSTEM_KS, ++ SystemKeyspace.BATCHLOG_CF, ++ PAGE_SIZE), ++ id); } + cleanup(); + logger.debug("Finished replayAllFailedBatches"); } @@@ -215,172 -202,89 +208,177 @@@ { id = row.getUUID("id"); long writtenAt = row.getLong("written_at"); - int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; // enough time for the actual write + batchlog entry mutation delivery (two separate requests). - long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation + long timeout = getBatchlogTimeout(); if (System.currentTimeMillis() < writtenAt + timeout) continue; // not ready to replay yet, might still get a deletion. - replayBatch(id, row.getBytes("data"), writtenAt, version, rateLimiter); + + int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; + Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version); + try + { + if (batch.replay(rateLimiter) > 0) + { + batches.add(batch); + } + else + { + deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated). + totalBatchesReplayed.incrementAndGet(); + } + } + catch (IOException e) + { + logger.warn("Skipped batch replay of {} due to {}", id, e); + deleteBatch(id); + } } + + // now waiting for all batches to complete their processing + // schedule hints for timed out deliveries + for (Batch batch : batches) + { + batch.finish(); + deleteBatch(batch.id); + } + + totalBatchesReplayed.addAndGet(batches.size()); + return id; } + public long getBatchlogTimeout() + { + return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation + } + - private void replayBatch(UUID id, ByteBuffer data, long writtenAt, int version, RateLimiter rateLimiter) + private static class Batch { - logger.debug("Replaying batch {}", id); + private final UUID id; + private final long writtenAt; + private final ByteBuffer data; + private final int version; - try + private List<ReplayWriteResponseHandler> replayHandlers; + + public Batch(UUID id, long writtenAt, ByteBuffer data, int version) { - replaySerializedMutations(data, writtenAt, version, rateLimiter); + this.id = id; + this.writtenAt = writtenAt; + this.data = data; + this.version = version; } - catch (IOException e) + + public int replay(RateLimiter rateLimiter) throws IOException { - logger.warn("Skipped batch replay of {} due to {}", id, e); - } + logger.debug("Replaying batch {}", id); - deleteBatch(id); + List<Mutation> mutations = replayingMutations(); - totalBatchesReplayed.incrementAndGet(); - } + if (mutations.isEmpty()) + return 0; - private void deleteBatch(UUID id) - { - RowMutation mutation = new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id)); - mutation.delete(SystemKeyspace.BATCHLOG_CF, FBUtilities.timestampMicros()); - mutation.apply(); - } + int ttl = calculateHintTTL(mutations); + if (ttl <= 0) + return 0; - private void replaySerializedMutations(ByteBuffer data, long writtenAt, int version, RateLimiter rateLimiter) throws IOException - { - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); - int size = in.readInt(); - List<RowMutation> mutations = new ArrayList<>(size); + replayHandlers = sendReplays(mutations, writtenAt, ttl); + + rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation. + + return replayHandlers.size(); + } + + public void finish() + { + for (int i = 0; i < replayHandlers.size(); i++) + { + ReplayWriteResponseHandler handler = replayHandlers.get(i); + try + { + handler.get(); + } + catch (WriteTimeoutException e) + { + logger.debug("Timed out replaying a batched mutation to a node, will write a hint"); + // writing hints for the rest to hints, starting from i + writeHintsForUndeliveredEndpoints(i); + return; + } + } + } - for (int i = 0; i < size; i++) + private List<Mutation> replayingMutations() throws IOException { - RowMutation mutation = RowMutation.serializer.deserialize(in, version); + DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); + int size = in.readInt(); + List<Mutation> mutations = new ArrayList<>(size); + for (int i = 0; i < size; i++) + { + Mutation mutation = Mutation.serializer.deserialize(in, version); - // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis. - // We don't abort the replay entirely b/c this can be considered a succes (truncated is same as delivered then - // truncated. - for (UUID cfId : mutation.getColumnFamilyIds()) - if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId)) - mutation = mutation.without(cfId); + // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis. + // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then + // truncated. + for (UUID cfId : mutation.getColumnFamilyIds()) + if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId)) + mutation = mutation.without(cfId); - if (!mutation.isEmpty()) - mutations.add(mutation); + if (!mutation.isEmpty()) + mutations.add(mutation); + } + return mutations; } - if (!mutations.isEmpty()) - replayMutations(mutations, writtenAt, version, rateLimiter); - } + private void writeHintsForUndeliveredEndpoints(int startFrom) + { + try + { + // Here we deserialize mutations 2nd time from byte buffer. + // but this is ok, because timeout on batch direct delivery is rare + // (it can happen only several seconds until node is marked dead) + // so trading some cpu to keep less objects + List<Mutation> replayingMutations = replayingMutations(); + for (int i = startFrom; i < replayHandlers.size(); i++) + { + Mutation undeliveredMutation = replayingMutations.get(i); + int ttl = calculateHintTTL(replayingMutations); + ReplayWriteResponseHandler handler = replayHandlers.get(i); - /* - * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints - * when a replica is down or a write request times out. - */ - private void replayMutations(List<RowMutation> mutations, long writtenAt, int version, RateLimiter rateLimiter) throws IOException - { - int ttl = calculateHintTTL(mutations, writtenAt); - if (ttl <= 0) - return; // this batchlog entry has 'expired' - - List<InetAddress> liveEndpoints = new ArrayList<>(); - List<InetAddress> hintEndpoints = new ArrayList<>(); - - for (RowMutation mutation : mutations) + if (ttl > 0 && handler != null) + for (InetAddress endpoint : handler.undelivered) + StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl, endpoint); + } + } + catch (IOException e) + { + logger.error("Cannot schedule hints for undelivered batch", e); + } + } + + private List<ReplayWriteResponseHandler> sendReplays(List<Mutation> mutations, long writtenAt, int ttl) { + List<ReplayWriteResponseHandler> handlers = new ArrayList<>(mutations.size()); + for (Mutation mutation : mutations) + { + ReplayWriteResponseHandler handler = sendSingleReplayMutation(mutation, writtenAt, ttl); + if (handler != null) + handlers.add(handler); + } + return handlers; + } + + /** + * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints + * when a replica is down or a write request times out. + * + * @return direct delivery handler to wait on or null, if no live nodes found + */ + private ReplayWriteResponseHandler sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl) + { + Set<InetAddress> liveEndpoints = new HashSet<>(); String ks = mutation.getKeyspaceName(); - Token tk = StorageService.getPartitioner().getToken(mutation.key()); - int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, version); + Token<?> tk = StorageService.getPartitioner().getToken(mutation.key()); for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/440824c1/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java ----------------------------------------------------------------------