Updated Branches:
  refs/heads/cassandra-2.0 b5a2b6507 -> 36af40925

sstables from stalled repair sessions become live after a reboot and can 
resurrect deleted data
patch by jasobrown, reviewed by yukim for CASSANDRA-6503


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36af4092
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36af4092
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36af4092

Branch: refs/heads/cassandra-2.0
Commit: 36af40925b3e8e01ede5ff6d7ed9a16046409fe3
Parents: b5a2b65
Author: Jason Brown <jasedbr...@gmail.com>
Authored: Thu Jan 30 16:17:01 2014 -0800
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Thu Jan 30 16:17:01 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  28 ++++-
 .../apache/cassandra/io/sstable/Descriptor.java |  20 ++-
 .../apache/cassandra/io/sstable/SSTable.java    |   2 +-
 .../org/apache/cassandra/io/util/FileUtils.java |   2 +-
 .../cassandra/streaming/StreamLockfile.java     | 121 +++++++++++++++++++
 .../cassandra/streaming/StreamReader.java       |   5 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  72 ++++++++---
 .../cassandra/streaming/StreamSession.java      |  16 ++-
 .../cassandra/streaming/StreamTransferTask.java |  10 +-
 .../compress/CompressedStreamReader.java        |   4 +-
 .../streaming/messages/FileMessage.java         | 112 -----------------
 .../streaming/messages/IncomingFileMessage.java |  78 ++++++++++++
 .../streaming/messages/OutgoingFileMessage.java |  92 ++++++++++++++
 .../streaming/messages/StreamMessage.java       |  18 ++-
 15 files changed, 429 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 531ac15..56a72ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@
  * Release sstables upon rebuilding 2i (CASSANDRA-6635)
  * Add AbstractCompactionStrategy.startup() method (CASSANDRA-6637)
  * SSTableScanner may skip rows during cleanup (CASSANDRA-6638)
+ * sstables from stalled repair sessions can resurrect deleted data 
(CASSANDRA-6503)
 Merged from 1.2:
  * fsync compression metadata (CASSANDRA-6531)
  * Validate CF existence on execution for prepared statement (CASSANDRA-6535)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 8d09453..8750026 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db;
 
 import java.io.File;
+import java.io.FileFilter;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
@@ -66,6 +67,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.ColumnFamilyMetrics;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamLockfile;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
@@ -424,9 +426,33 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      */
     public static void scrubDataDirectories(String keyspaceName, String 
columnFamily)
     {
+        Directories directories = Directories.create(keyspaceName, 
columnFamily);
+
+        // remove any left-behind SSTables from failed/stalled streaming
+        FileFilter filter = new FileFilter()
+        {
+            public boolean accept(File pathname)
+            {
+                return pathname.toString().endsWith(StreamLockfile.FILE_EXT);
+            }
+        };
+        for (File dir : directories.getCFDirectories())
+        {
+            File[] lockfiles = dir.listFiles(filter);
+            if (lockfiles.length == 0)
+                continue;
+            logger.info("Removing SSTables from failed streaming session. 
Found {} files to cleanup.", lockfiles.length);
+
+            for (File lockfile : lockfiles)
+            {
+                StreamLockfile streamLockfile = new StreamLockfile(lockfile);
+                streamLockfile.cleanup();
+                streamLockfile.delete();
+            }
+        }
+
         logger.debug("Removing compacted SSTable files from {} (see 
http://wiki.apache.org/cassandra/MemtableSSTable)", columnFamily);
 
-        Directories directories = Directories.create(keyspaceName, 
columnFamily);
         for (Map.Entry<Descriptor,Set<Component>> sstableFiles : 
directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java 
b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 1b29c1c..d65da45 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -201,7 +201,18 @@ public class Descriptor
     public static Descriptor fromFilename(String filename)
     {
         File file = new File(filename);
-        return fromFilename(file.getParentFile(), file.getName()).left;
+        return fromFilename(file.getParentFile(), file.getName(), false).left;
+    }
+
+    public static Descriptor fromFilename(String filename, boolean 
skipComponent)
+    {
+        File file = new File(filename);
+        return fromFilename(file.getParentFile(), file.getName(), 
skipComponent).left;
+    }
+
+    public static Pair<Descriptor,String> fromFilename(File directory, String 
name)
+    {
+        return fromFilename(directory, name, false);
     }
 
     /**
@@ -209,10 +220,11 @@ public class Descriptor
      *
      * @param directory The directory of the SSTable files
      * @param name The name of the SSTable file
+     * @param skipComponent true if the name param should not be parsed for a 
component tag
      *
      * @return A Descriptor for the SSTable, and the Component remainder.
      */
-    public static Pair<Descriptor,String> fromFilename(File directory, String 
name)
+    public static Pair<Descriptor,String> fromFilename(File directory, String 
name, boolean skipComponent)
     {
         // tokenize the filename
         StringTokenizer st = new StringTokenizer(name, 
String.valueOf(separator));
@@ -239,7 +251,9 @@ public class Descriptor
         int generation = Integer.parseInt(nexttok);
 
         // component suffix
-        String component = st.nextToken();
+        String component = null;
+        if (!skipComponent)
+            component = st.nextToken();
         directory = directory != null ? directory : new File(".");
         return Pair.create(new Descriptor(version, directory, ksname, cfname, 
generation, temporary), component);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java 
b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index eaa4522..69c6521 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -205,7 +205,7 @@ public abstract class SSTable
     /**
      * Discovers existing components for the descriptor. Slow: only intended 
for use outside the critical path.
      */
-    static Set<Component> componentsFor(final Descriptor desc)
+    public static Set<Component> componentsFor(final Descriptor desc)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java 
b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 6b91bd3..0d8538e 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -310,7 +310,7 @@ public class FileUtils
         return f.delete();
     }
 
-    public static void delete(File[] files)
+    public static void delete(File... files)
     {
         for ( File file : files )
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/StreamLockfile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamLockfile.java 
b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
new file mode 100644
index 0000000..0eb01c5
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.*;
+
+import com.google.common.base.Charsets;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the behavior for 'locking' any streamed sttables to a node.
+ * If a process crashes while converting a set of SSTableWriters to SSTReaders
+ * (meaning, some subset of SSTWs were converted, but not the entire set), we 
want
+ * to disregard the entire set as we will surely have missing data (by 
definition).
+ *
+ * Basic behavior id to write out the names of all SSTWs to a file, one SSTW 
per line,
+ * and then delete the file when complete (normal behavior). This should 
happen before
+ * converting any SSTWs. Thus, the lockfile is created, some SSTWs are 
converted,
+ * and if the process crashes, on restart, we look for any existing lockfile, 
and delete
+ * any referenced SSTRs.
+ */
+public class StreamLockfile
+{
+    public static final String FILE_EXT = ".lockfile";
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamLockfile.class);
+
+    private final File lockfile;
+
+    public StreamLockfile(File directory, UUID uuid)
+    {
+        lockfile = new File(directory, uuid.toString() + FILE_EXT);
+    }
+
+    public StreamLockfile(File lockfile)
+    {
+        assert lockfile != null;
+        this.lockfile = lockfile;
+    }
+
+    public void create(Collection<SSTableWriter> sstables)
+    {
+        List<String> sstablePaths = new ArrayList<>(sstables.size());
+        for (SSTableWriter writer : sstables)
+        {
+            /* write out the file names *without* the 'tmp-file' flag in the 
file name.
+               this class will not need to clean up tmp files (on restart), 
CassandraDaemon does that already,
+               just make sure we delete the fully-formed SSTRs. */
+            
sstablePaths.add(writer.descriptor.asTemporary(false).baseFilename());
+        }
+
+        try
+        {
+            Files.write(lockfile.toPath(), sstablePaths, Charsets.UTF_8,
+                    StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, 
StandardOpenOption.DSYNC);
+        }
+        catch (IOException e)
+        {
+            logger.warn(String.format("Could not create lockfile %s for stream 
session, nothing to worry too much about", lockfile), e);
+        }
+    }
+
+    public void delete()
+    {
+        FileUtils.delete(lockfile);
+    }
+
+    public void cleanup()
+    {
+        List<String> files = readLockfile(lockfile);
+        for (String file : files)
+        {
+            try
+            {
+                Descriptor desc = Descriptor.fromFilename(file, true);
+                SSTable.delete(desc, SSTable.componentsFor(desc));
+            }
+            catch (Exception e)
+            {
+                logger.warn("failed to delete a potentially stale sstable {}", 
file);
+            }
+        }
+    }
+
+    private List<String> readLockfile(File lockfile)
+    {
+        try
+        {
+            return Files.readAllLines(lockfile.toPath(), Charsets.UTF_8);
+        }
+        catch (IOException e)
+        {
+            logger.info("couldn't read lockfile {}, ignoring", 
lockfile.getAbsolutePath());
+            return Collections.emptyList();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java 
b/src/java/org/apache/cassandra/streaming/StreamReader.java
index d72cb5e..72c239c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
@@ -71,7 +70,7 @@ public class StreamReader
      * @return SSTable transferred
      * @throws IOException if reading the remote sstable fails. Will throw an 
RTE if local write fails.
      */
-    public SSTableReader read(ReadableByteChannel channel) throws IOException
+    public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
         long totalSize = totalSize();
 
@@ -89,7 +88,7 @@ public class StreamReader
                 // TODO move this to BytesReadTracker
                 session.progress(desc, ProgressInfo.Direction.IN, 
in.getBytesRead(), totalSize);
             }
-            return writer.closeAndOpenReader();
+            return writer;
         }
         catch (Throwable e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java 
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index ac21352..9a2568d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -19,12 +19,15 @@ package org.apache.cassandra.streaming;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -36,16 +39,17 @@ public class StreamReceiveTask extends StreamTask
     private final int totalFiles;
     // total size of files to receive
     private final long totalSize;
+    private volatile boolean aborted;
 
     //  holds references to SSTables received
-    protected Collection<SSTableReader> sstables;
+    protected Collection<SSTableWriter> sstables;
 
     public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, 
long totalSize)
     {
         super(session, cfId);
         this.totalFiles = totalFiles;
         this.totalSize = totalSize;
-        this.sstables =  new ArrayList<>(totalFiles);
+        this.sstables = new ArrayList<>(totalFiles);
     }
 
     /**
@@ -53,9 +57,10 @@ public class StreamReceiveTask extends StreamTask
      *
      * @param sstable SSTable file received.
      */
-    public void received(SSTableReader sstable)
+    public void received(SSTableWriter sstable)
     {
         assert cfId.equals(sstable.metadata.cfId);
+        assert !aborted;
 
         sstables.add(sstable);
         if (sstables.size() == totalFiles)
@@ -72,24 +77,61 @@ public class StreamReceiveTask extends StreamTask
         return totalSize;
     }
 
-    // TODO should be run in background so that this does not block streaming
     private void complete()
     {
-        if (!SSTableReader.acquireReferences(sstables))
-            throw new AssertionError("We shouldn't fail acquiring a reference 
on a sstable that has just been transferred");
-        try
+        if (!sstables.isEmpty())
+            StorageService.tasks.submit(new OnCompletionRunnable(this));
+    }
+
+    private static class OnCompletionRunnable implements Runnable
+    {
+        private final StreamReceiveTask task;
+
+        public OnCompletionRunnable(StreamReceiveTask task)
         {
-            Pair<String, String> kscf = Schema.instance.getCF(cfId);
-            ColumnFamilyStore cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-            // add sstables and build secondary indexes
-            cfs.addSSTables(sstables);
-            cfs.indexManager.maybeBuildSecondaryIndexes(sstables, 
cfs.indexManager.allIndexesNames());
+            this.task = task;
         }
-        finally
+
+        public void run()
         {
-            SSTableReader.releaseReferences(sstables);
+            Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+            ColumnFamilyStore cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+            StreamLockfile lockfile = new 
StreamLockfile(cfs.directories.getWriteableLocationAsFile(), UUID.randomUUID());
+            lockfile.create(task.sstables);
+            List<SSTableReader> readers = new ArrayList<>();
+            for (SSTableWriter writer : task.sstables)
+                readers.add(writer.closeAndOpenReader());
+            lockfile.delete();
+
+            if (!SSTableReader.acquireReferences(readers))
+                throw new AssertionError("We shouldn't fail acquiring a 
reference on a sstable that has just been transferred");
+            try
+            {
+                // add sstables and build secondary indexes
+                cfs.addSSTables(readers);
+                cfs.indexManager.maybeBuildSecondaryIndexes(readers, 
cfs.indexManager.allIndexesNames());
+            }
+            finally
+            {
+                SSTableReader.releaseReferences(readers);
+            }
+
+            task.session.taskCompleted(task);
         }
+    }
 
-        session.taskCompleted(this);
+    public void abort()
+    {
+        aborted = true;
+        Runnable r = new Runnable()
+        {
+            public void run()
+            {
+                for (SSTableWriter writer : sstables)
+                    writer.abort();
+            }
+        };
+        StorageService.tasks.submit(r);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 98a76fc..4777995 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -23,6 +23,7 @@ import java.util.*;
 import java.util.concurrent.Future;
 
 import com.google.common.collect.Lists;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -312,6 +313,12 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
     {
         state(finalState);
 
+        if (finalState == State.FAILED)
+        {
+            for (StreamReceiveTask srt : receivers.values())
+                srt.abort();
+        }
+
         // Note that we shouldn't block on this close because this method is 
called on the handler
         // incoming thread (so we would deadlock).
         handler.close();
@@ -359,7 +366,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
                 break;
 
             case FILE:
-                receive((FileMessage) message);
+                receive((IncomingFileMessage) message);
                 break;
 
             case RECEIVED:
@@ -458,7 +465,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
      *
      * @param message received file
      */
-    public void receive(FileMessage message)
+    public void receive(IncomingFileMessage message)
     {
         long headerSize = message.header.size();
         StreamingMetrics.totalIncomingBytes.inc(headerSize);
@@ -487,7 +494,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
      */
     public void retry(UUID cfId, int sequenceNumber)
     {
-        FileMessage message = 
transfers.get(cfId).createMessageForRetry(sequenceNumber);
+        OutgoingFileMessage message = 
transfers.get(cfId).createMessageForRetry(sequenceNumber);
         handler.sendMessage(message);
     }
 
@@ -502,6 +509,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
         }
         else
         {
+            handler.sendMessage(new CompleteMessage());
             state(State.WAIT_COMPLETE);
         }
     }
@@ -623,7 +631,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
         state(State.STREAMING);
         for (StreamTransferTask task : transfers.values())
         {
-            Collection<FileMessage> messages = task.getFileMessages();
+            Collection<OutgoingFileMessage> messages = task.getFileMessages();
             if (messages.size() > 0)
                 handler.sendMessages(messages);
             else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java 
b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 61ad058..8e461cc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -21,7 +21,7 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.streaming.messages.FileMessage;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -31,7 +31,7 @@ public class StreamTransferTask extends StreamTask
 {
     private final AtomicInteger sequenceNumber = new AtomicInteger(0);
 
-    private final Map<Integer, FileMessage> files = new HashMap<>();
+    private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
 
     private long totalSize;
 
@@ -43,7 +43,7 @@ public class StreamTransferTask extends StreamTask
     public void addTransferFile(SSTableReader sstable, long estimatedKeys, 
List<Pair<Long, Long>> sections)
     {
         assert sstable != null && cfId.equals(sstable.metadata.cfId);
-        FileMessage message = new FileMessage(sstable, 
sequenceNumber.getAndIncrement(), estimatedKeys, sections);
+        OutgoingFileMessage message = new OutgoingFileMessage(sstable, 
sequenceNumber.getAndIncrement(), estimatedKeys, sections);
         files.put(message.header.sequenceNumber, message);
         totalSize += message.header.size();
     }
@@ -71,14 +71,14 @@ public class StreamTransferTask extends StreamTask
         return totalSize;
     }
 
-    public Collection<FileMessage> getFileMessages()
+    public Collection<OutgoingFileMessage> getFileMessages()
     {
         // We may race between queuing all those messages and the completion 
of the completion of
         // the first ones. So copy the values to avoid a 
ConcurrentModificationException
         return new ArrayList<>(files.values());
     }
 
-    public FileMessage createMessageForRetry(int sequenceNumber)
+    public OutgoingFileMessage createMessageForRetry(int sequenceNumber)
     {
         return files.get(sequenceNumber);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 6f5d0f5..4aac941 100644
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -55,7 +55,7 @@ public class CompressedStreamReader extends StreamReader
      * @throws java.io.IOException if reading the remote sstable fails. Will 
throw an RTE if local write fails.
      */
     @Override
-    public SSTableReader read(ReadableByteChannel channel) throws IOException
+    public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
         long totalSize = totalSize();
 
@@ -81,7 +81,7 @@ public class CompressedStreamReader extends StreamReader
                     session.progress(desc, ProgressInfo.Direction.IN, 
cis.getTotalCompressedBytesRead(), totalSize);
                 }
             }
-            return writer.closeAndOpenReader();
+            return writer;
         }
         catch (Throwable e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
deleted file mode 100644
index a0543c0..0000000
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.messages;
-
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.List;
-
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.streaming.StreamReader;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.StreamWriter;
-import org.apache.cassandra.streaming.compress.CompressedStreamReader;
-import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
-import org.apache.cassandra.streaming.compress.CompressionInfo;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * FileMessage is used to transfer/receive the part(or whole) of a SSTable 
data file.
- */
-public class FileMessage extends StreamMessage
-{
-    public static Serializer<FileMessage> serializer = new 
Serializer<FileMessage>()
-    {
-        public FileMessage deserialize(ReadableByteChannel in, int version, 
StreamSession session) throws IOException
-        {
-            DataInputStream input = new 
DataInputStream(Channels.newInputStream(in));
-            FileMessageHeader header = 
FileMessageHeader.serializer.deserialize(input, version);
-            StreamReader reader = header.compressionInfo == null ? new 
StreamReader(header, session)
-                                          : new CompressedStreamReader(header, 
session);
-
-            try
-            {
-                return new FileMessage(reader.read(in), header);
-            }
-            catch (Throwable e)
-            {
-                session.doRetry(header, e);
-                return null;
-            }
-        }
-
-        public void serialize(FileMessage message, WritableByteChannel out, 
int version, StreamSession session) throws IOException
-        {
-            DataOutput output = new 
DataOutputStream(Channels.newOutputStream(out));
-            FileMessageHeader.serializer.serialize(message.header, output, 
version);
-            StreamWriter writer = message.header.compressionInfo == null ?
-                                          new StreamWriter(message.sstable, 
message.header.sections, session) :
-                                          new 
CompressedStreamWriter(message.sstable,
-                                                                     
message.header.sections,
-                                                                     
message.header.compressionInfo, session);
-            writer.write(out);
-            session.fileSent(message.header);
-        }
-    };
-
-    public final FileMessageHeader header;
-    public final SSTableReader sstable;
-
-    public FileMessage(SSTableReader sstable, FileMessageHeader header)
-    {
-        super(Type.FILE);
-        this.header = header;
-        this.sstable = sstable;
-    }
-
-    public FileMessage(SSTableReader sstable, int sequenceNumber, long 
estimatedKeys, List<Pair<Long, Long>> sections)
-    {
-        super(Type.FILE);
-        this.sstable = sstable;
-
-        CompressionInfo compressionInfo = null;
-        if (sstable.compression)
-        {
-            CompressionMetadata meta = sstable.getCompressionMetadata();
-            compressionInfo = new 
CompressionInfo(meta.getChunksForSections(sections), meta.parameters);
-        }
-        this.header = new FileMessageHeader(sstable.metadata.cfId,
-                                            sequenceNumber,
-                                            
sstable.descriptor.version.toString(),
-                                            estimatedKeys,
-                                            sections,
-                                            compressionInfo);
-    }
-
-    @Override
-    public String toString()
-    {
-        return "File (" + header + ", file: " + sstable.getFilename() + ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
new file mode 100644
index 0000000..a403390
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.streaming.StreamReader;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.compress.CompressedStreamReader;
+
+/**
+ * IncomingFileMessage is used to receive the part(or whole) of a SSTable data 
file.
+ */
+public class IncomingFileMessage extends StreamMessage
+{
+    public static Serializer<IncomingFileMessage> serializer = new 
Serializer<IncomingFileMessage>()
+    {
+        public IncomingFileMessage deserialize(ReadableByteChannel in, int 
version, StreamSession session) throws IOException
+        {
+            DataInputStream input = new 
DataInputStream(Channels.newInputStream(in));
+            FileMessageHeader header = 
FileMessageHeader.serializer.deserialize(input, version);
+            StreamReader reader = header.compressionInfo == null ? new 
StreamReader(header, session)
+                    : new CompressedStreamReader(header, session);
+
+            try
+            {
+                return new IncomingFileMessage(reader.read(in), header);
+            }
+            catch (Throwable e)
+            {
+                session.doRetry(header, e);
+                return null;
+            }
+        }
+
+        public void serialize(IncomingFileMessage message, WritableByteChannel 
out, int version, StreamSession session) throws IOException
+        {
+            throw new UnsupportedOperationException("Not allowed to call 
serialize on an incoming file");
+        }
+    };
+
+    public FileMessageHeader header;
+    public SSTableWriter sstable;
+
+    public IncomingFileMessage(SSTableWriter sstable, FileMessageHeader header)
+    {
+        super(Type.FILE);
+        this.header = header;
+        this.sstable = sstable;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "File (" + header + ", file: " + sstable.getFilename() + ")";
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
new file mode 100644
index 0000000..1fa115f
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamWriter;
+import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
+import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * OutgoingFileMessage is used to transfer the part(or whole) of a SSTable 
data file.
+ */
+public class OutgoingFileMessage extends StreamMessage
+{
+    public static Serializer<OutgoingFileMessage> serializer = new 
Serializer<OutgoingFileMessage>()
+    {
+        public OutgoingFileMessage deserialize(ReadableByteChannel in, int 
version, StreamSession session) throws IOException
+        {
+            throw new UnsupportedOperationException("Not allowed to call 
deserialize on an outgoing file");
+        }
+
+        public void serialize(OutgoingFileMessage message, WritableByteChannel 
out, int version, StreamSession session) throws IOException
+        {
+            DataOutput output = new 
DataOutputStream(Channels.newOutputStream(out));
+            FileMessageHeader.serializer.serialize(message.header, output, 
version);
+
+            final SSTableReader reader = message.sstable;
+            StreamWriter writer = message.header.compressionInfo == null ?
+                    new StreamWriter(reader, message.header.sections, session) 
:
+                    new CompressedStreamWriter(reader,
+                            message.header.sections,
+                            message.header.compressionInfo, session);
+            writer.write(out);
+            session.fileSent(message.header);
+        }
+    };
+
+    public FileMessageHeader header;
+    public SSTableReader sstable;
+
+    public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long 
estimatedKeys, List<Pair<Long, Long>> sections)
+    {
+        super(Type.FILE);
+        this.sstable = sstable;
+
+        CompressionInfo compressionInfo = null;
+        if (sstable.compression)
+        {
+            CompressionMetadata meta = sstable.getCompressionMetadata();
+            compressionInfo = new 
CompressionInfo(meta.getChunksForSections(sections), meta.parameters);
+        }
+        this.header = new FileMessageHeader(sstable.metadata.cfId,
+                sequenceNumber,
+                sstable.descriptor.version.toString(),
+                estimatedKeys,
+                sections,
+                compressionInfo);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "File (" + header + ", file: " + sstable.getFilename() + ")";
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 9e146e8..7010c95 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -42,7 +42,7 @@ public abstract class StreamMessage
         buff.flip();
         while (buff.hasRemaining())
             out.write(buff);
-        message.type.serializer.serialize(message, out, version, session);
+        message.type.outSerializer.serialize(message, out, version, session);
     }
 
     public static StreamMessage deserialize(ReadableByteChannel in, int 
version, StreamSession session) throws IOException
@@ -52,7 +52,7 @@ public abstract class StreamMessage
         {
             buff.flip();
             Type type = Type.get(buff.get());
-            return type.serializer.deserialize(in, version, session);
+            return type.inSerializer.deserialize(in, version, session);
         }
         else
         {
@@ -73,7 +73,7 @@ public abstract class StreamMessage
     public static enum Type
     {
         PREPARE(1, 5, PrepareMessage.serializer),
-        FILE(2, 0, FileMessage.serializer),
+        FILE(2, 0, IncomingFileMessage.serializer, 
OutgoingFileMessage.serializer),
         RECEIVED(3, 4, ReceivedMessage.serializer),
         RETRY(4, 4, RetryMessage.serializer),
         COMPLETE(5, 1, CompleteMessage.serializer),
@@ -91,14 +91,22 @@ public abstract class StreamMessage
 
         private final byte type;
         public final int priority;
-        public final Serializer<StreamMessage> serializer;
+        public final Serializer<StreamMessage> inSerializer;
+        public final Serializer<StreamMessage> outSerializer;
 
         @SuppressWarnings("unchecked")
         private Type(int type, int priority, Serializer serializer)
         {
+            this(type, priority, serializer, serializer);
+        }
+
+        @SuppressWarnings("unchecked")
+        private Type(int type, int priority, Serializer inSerializer, 
Serializer outSerializer)
+        {
             this.type = (byte) type;
             this.priority = priority;
-            this.serializer = serializer;
+            this.inSerializer = inSerializer;
+            this.outSerializer = outSerializer;
         }
     }
 

Reply via email to