Author: jbellis
Date: Mon Apr 18 21:58:20 2011
New Revision: 1094789

URL: http://svn.apache.org/viewvc?rev=1094789&view=rev
Log:
preserve version when streaming data from old sstables
patch by jbellis; reviewed by Stu Hood for CASSANDRA-2283

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
    
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
    
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1094789&r1=1094788&r2=1094789&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Apr 18 21:58:20 2011
@@ -30,6 +30,7 @@
  * use 64KB flush buffer instead of in_memory_compaction_limit (CASSANDRA-2463)
  * fix duplicate results from CFS.scan (CASSANDRA-2406)
  * avoid caching token-only decoratedkeys (CASSANDRA-2416)
+ * preserve version when streaming data from old sstables (CASSANDRA-2283)
 
 
 0.7.4

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1094789&r1=1094788&r2=1094789&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Mon Apr 18 21:58:20 2011
@@ -657,23 +657,29 @@ public class ColumnFamilyStore implement
         return columnFamily;
     }
 
+    public String getFlushPath()
+    {
+        return getFlushPath(Descriptor.CURRENT_VERSION);
+    }
+
     /*
      * @return a temporary file name for an sstable.
      * When the sstable object is closed, it will be renamed to a non-temporary
      * format, so incomplete sstables can be recognized and removed on startup.
      */
-    public String getFlushPath()
+    public String getFlushPath(String version)
     {
         long guessedSize = 2L * memsize.value() * 1024*1024; // 2* adds room 
for keys, column indexes
         String location = 
DatabaseDescriptor.getDataFileLocationForTable(table.name, guessedSize);
         if (location == null)
             throw new RuntimeException("Insufficient disk space to flush");
-        return getTempSSTablePath(location);
+        return getTempSSTablePath(location, version);
     }
 
-    public String getTempSSTablePath(String directory)
+    public String getTempSSTablePath(String directory, String version)
     {
-        Descriptor desc = new Descriptor(new File(directory),
+        Descriptor desc = new Descriptor(version,
+                                         new File(directory),
                                          table.name,
                                          columnFamily,
                                          fileIndexGenerator.incrementAndGet(),
@@ -681,6 +687,11 @@ public class ColumnFamilyStore implement
         return desc.filenameFor(Component.DATA);
     }
 
+    public String getTempSSTablePath(String directory)
+    {
+        return getTempSSTablePath(directory, Descriptor.CURRENT_VERSION);
+    }
+
     /** flush the given memtable and swap in a new one for its CFS, if it 
hasn't been frozen already.  threadsafe. */
     Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean 
writeCommitLog)
     {

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1094789&r1=1094788&r2=1094789&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java
 Mon Apr 18 21:58:20 2011
@@ -49,11 +49,6 @@ public class StreamIn
     /**
      * Request ranges to be transferred from source to local node
      */
-    public static void requestRanges(InetAddress source, String tableName, 
Collection<Range> ranges)
-    {
-        requestRanges(source, tableName, ranges, null);
-    }
-
     public static void requestRanges(InetAddress source, String tableName, 
Collection<Range> ranges, Runnable callback)
     {
         assert ranges.size() > 0;
@@ -74,7 +69,7 @@ public class StreamIn
         // new local sstable
         Table table = Table.open(remotedesc.ksname);
         ColumnFamilyStore cfStore = 
table.getColumnFamilyStore(remotedesc.cfname);
-        Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath());
+        Descriptor localdesc = 
Descriptor.fromFilename(cfStore.getFlushPath(remote.desc.version));
 
         return new PendingFile(localdesc, remote);
      }

Modified: 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?rev=1094789&r1=1094788&r2=1094789&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
 Mon Apr 18 21:58:20 2011
@@ -37,6 +37,7 @@ public class BootstrapTest extends Schem
     public void testGetNewNames() throws IOException
     {
         Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", 
"Standard1-500-Data.db").toString());
+        assert !desc.isLatestVersion; // deliberately test old version; see 
CASSANDRA-2283
         PendingFile inContext = new PendingFile(null, desc, "Data.db", 
Arrays.asList(new Pair<Long,Long>(0L, 1L)));
 
         PendingFile outContext = StreamIn.getContextMapping(inContext);
@@ -45,7 +46,8 @@ public class BootstrapTest extends Schem
 
         // nothing else should
         assertEquals(inContext.component, outContext.component);
-        assertEquals(inContext.desc.ksname, outContext.desc.ksname);
-        assertEquals(inContext.desc.cfname, outContext.desc.cfname);
+        assertEquals(desc.ksname, outContext.desc.ksname);
+        assertEquals(desc.cfname, outContext.desc.cfname);
+        assertEquals(desc.version, outContext.desc.version);
     }
 }

Modified: 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1094789&r1=1094788&r2=1094789&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
 Mon Apr 18 21:58:20 2011
@@ -50,9 +50,9 @@ public class SerializationsTest extends 
     private void testPendingFileWrite() throws IOException
     {
         // make sure to test serializing null and a pf with no sstable.
-        PendingFile normal = makePendingFile(true, "fake_component", 100);
-        PendingFile noSections = makePendingFile(true, "not_real", 0);
-        PendingFile noSST = makePendingFile(false, "also_fake", 100);
+        PendingFile normal = makePendingFile(true, 100);
+        PendingFile noSections = makePendingFile(true, 0);
+        PendingFile noSST = makePendingFile(false, 100);
         
         DataOutputStream out = getOutput("streaming.PendingFile.bin");
         PendingFile.serializer().serialize(normal, out);
@@ -78,14 +78,14 @@ public class SerializationsTest extends 
     
     private void testStreamHeaderWrite() throws IOException
     {
-        StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, 
makePendingFile(true, "zz", 100));
-        StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, 
makePendingFile(false, "zz", 100));
+        StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, 
makePendingFile(true, 100));
+        StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, 
makePendingFile(false, 100));
         Collection<PendingFile> files = new ArrayList<PendingFile>();
         for (int i = 0; i < 50; i++)
-            files.add(makePendingFile(i % 2 == 0, "aa", 100));
-        StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, 
makePendingFile(true, "bb", 100), files);
+            files.add(makePendingFile(i % 2 == 0, 100));
+        StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, 
makePendingFile(true, 100), files);
         StreamHeader sh3 = new StreamHeader("Keyspace1", 125L, null, files);
-        StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, 
makePendingFile(true, "bb", 100), new ArrayList<PendingFile>());
+        StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, 
makePendingFile(true, 100), new ArrayList<PendingFile>());
         
         DataOutputStream out = getOutput("streaming.StreamHeader.bin");
         StreamHeader.serializer().serialize(sh0, out);
@@ -132,13 +132,13 @@ public class SerializationsTest extends 
         in.close();
     }
     
-    private static PendingFile makePendingFile(boolean sst, String comp, int 
numSecs)
+    private static PendingFile makePendingFile(boolean sst, int numSecs)
     {
         Descriptor desc = new Descriptor("z", new File("path/doesn't/matter"), 
"Keyspace1", "Standard1", 23, false);
         List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
         for (int i = 0; i < numSecs; i++)
             sections.add(new Pair<Long, Long>(new Long(i), new Long(i * i)));
-        return new PendingFile(sst ? makeSSTable() : null, desc, comp, 
sections);
+        return new PendingFile(sst ? makeSSTable() : null, desc, 
SSTable.COMPONENT_DATA, sections);
     }
     
     private void testStreamRequestMessageWrite() throws IOException
@@ -147,8 +147,8 @@ public class SerializationsTest extends 
         for (int i = 0; i < 5; i++)
             ranges.add(new Range(new 
BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new 
BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5)))));
         StreamRequestMessage msg0 = new 
StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", 123L);
-        StreamRequestMessage msg1 = new 
StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, "aa", 
100), 124L);
-        StreamRequestMessage msg2 = new 
StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false, 
"aa", 100), 124L);
+        StreamRequestMessage msg1 = new 
StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, 100), 
124L);
+        StreamRequestMessage msg2 = new 
StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false, 
100), 124L);
         
         DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin");
         StreamRequestMessage.serializer().serialize(msg0, out);


Reply via email to