Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 123d5bcdf -> 04fd84c3d refs/heads/cassandra-2.1 3a51a7e83 -> afc4e2eb2 refs/heads/trunk 345772dd8 -> 4bc9b47fa
Fix SSTable not released if stream session fails patch by yukim; reviewed by Richard Low for CASSANDRA-6818 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/04fd84c3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/04fd84c3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/04fd84c3 Branch: refs/heads/cassandra-2.0 Commit: 04fd84c3d536a82b1e20a6295bc09d9250b76d29 Parents: 123d5bc Author: Yuki Morishita <yu...@apache.org> Authored: Thu Apr 3 16:17:44 2014 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu Apr 3 16:17:44 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamSession.java | 8 +- .../apache/cassandra/streaming/StreamTask.java | 6 ++ .../cassandra/streaming/StreamTransferTask.java | 61 +++++++++++++-- .../cassandra/streaming/StreamWriter.java | 3 - .../compress/CompressedStreamWriter.java | 2 - .../streaming/StreamTransferTaskTest.java | 80 ++++++++++++++++++++ 7 files changed, 149 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3200941..38a6c3c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -46,6 +46,7 @@ * Fix map element access in IF (CASSANDRA-6914) * Avoid costly range calculations for range queries on system keyspaces (CASSANDRA-6906) + * Fix SSTable not released if stream session fails (CASSANDRA-6818) Merged from 1.2: * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816) * add extra SSL cipher suites (CASSANDRA-6613) http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 7972183..449751d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.net.InetAddress; import java.util.*; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.cassandra.io.sstable.SSTableWriter; import org.slf4j.Logger; @@ -316,8 +318,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe if (finalState == State.FAILED) { - for (StreamReceiveTask srt : receivers.values()) - srt.abort(); + for (StreamTask task : Iterables.concat(receivers.values(), transfers.values())) + task.abort(); } // Note that we shouldn't block on this close because this method is called on the handler @@ -459,6 +461,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe long headerSize = header.size(); StreamingMetrics.totalOutgoingBytes.inc(headerSize); metrics.outgoingBytes.inc(headerSize); + // schedule timeout for receiving ACK + transfers.get(header.cfId).scheduleTimeout(header.sequenceNumber, 12, TimeUnit.HOURS); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTask.java b/src/java/org/apache/cassandra/streaming/StreamTask.java index 9e9e06f..ac72cff 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTask.java @@ -46,6 +46,12 @@ public abstract class StreamTask public abstract long getTotalSize(); /** + * Abort the task. + * Subclass should implement cleaning up resources. + */ + public abstract void abort(); + + /** * @return StreamSummary that describes this task */ public StreamSummary getSummary() http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index 8e461cc..86f4ee2 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -18,6 +18,7 @@ package org.apache.cassandra.streaming; import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.io.sstable.SSTableReader; @@ -29,9 +30,13 @@ import org.apache.cassandra.utils.Pair; */ public class StreamTransferTask extends StreamTask { + private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); + private final AtomicInteger sequenceNumber = new AtomicInteger(0); - private final Map<Integer, OutgoingFileMessage> files = new HashMap<>(); + private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>(); + + private final Map<Integer, ScheduledFuture> timeoutTasks = new ConcurrentHashMap<>(); private long totalSize; @@ -55,10 +60,26 @@ public class StreamTransferTask extends StreamTask */ public void complete(int sequenceNumber) { - files.remove(sequenceNumber); - // all file sent, notify session this task is complete. - if (files.isEmpty()) - session.taskCompleted(this); + OutgoingFileMessage file = files.remove(sequenceNumber); + if (file != null) + { + file.sstable.releaseReference(); + // all file sent, notify session this task is complete. + if (files.isEmpty()) + { + timeoutExecutor.shutdownNow(); + session.taskCompleted(this); + } + } + } + + public void abort() + { + for (OutgoingFileMessage file : files.values()) + { + file.sstable.releaseReference(); + } + timeoutExecutor.shutdownNow(); } public int getTotalNumberOfFiles() @@ -80,6 +101,36 @@ public class StreamTransferTask extends StreamTask public OutgoingFileMessage createMessageForRetry(int sequenceNumber) { + // remove previous time out task to be rescheduled later + ScheduledFuture future = timeoutTasks.get(sequenceNumber); + future.cancel(false); return files.get(sequenceNumber); } + + /** + * Schedule timeout task to release reference for file sent. + * When not receiving ACK after sending to receiver in given time, + * the task will release reference. + * + * @param sequenceNumber sequence number of file sent. + * @param time time to timeout + * @param unit unit of given time + * @return scheduled future for timeout task + */ + public ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit) + { + if (timeoutExecutor.isShutdown()) + return null; + + ScheduledFuture future = timeoutExecutor.schedule(new Runnable() + { + public void run() + { + StreamTransferTask.this.complete(sequenceNumber); + timeoutTasks.remove(sequenceNumber); + } + }, time, unit); + timeoutTasks.put(sequenceNumber, future); + return future; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java index dbc7390..5609f20 100644 --- a/src/java/org/apache/cassandra/streaming/StreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java @@ -115,9 +115,6 @@ public class StreamWriter FileUtils.closeQuietly(file); FileUtils.closeQuietly(validator); } - - // release reference only when completed successfully - sstable.releaseReference(); } protected long totalSize() http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java index 80fcef5..001c927 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java @@ -83,8 +83,6 @@ public class CompressedStreamWriter extends StreamWriter // no matter what happens close file FileUtils.closeQuietly(file); } - - sstable.releaseReference(); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java new file mode 100644 index 0000000..9b02817 --- /dev/null +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class StreamTransferTaskTest extends SchemaLoader +{ + @Test + public void testScheduleTimeout() throws Exception + { + String ks = "Keyspace1"; + String cf = "Standard1"; + + StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress()); + ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf); + + // create two sstables + for (int i = 0; i < 2; i++) + { + insertData(ks, cf, i, 1); + cfs.forceBlockingFlush(); + } + + // create streaming task that streams those two sstables + StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId); + for (SSTableReader sstable : cfs.getSSTables()) + { + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); + task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges)); + } + assertEquals(2, task.getTotalNumberOfFiles()); + + // if file sending completes before timeout then the task should be canceled. + ScheduledFuture f = task.scheduleTimeout(0, 1, TimeUnit.SECONDS); + task.complete(0); + // timeout task may run after complete but it is noop + f.get(); + + // when timeout runs on second file, task should be completed + f = task.scheduleTimeout(1, 1, TimeUnit.MILLISECONDS); + f.get(); + assertEquals(StreamSession.State.WAIT_COMPLETE, session.state()); + + // when all streaming are done, time out task should not be scheduled. + assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS)); + } +}