Author: jbellis Date: Mon Apr 18 21:58:20 2011 New Revision: 1094789 URL: http://svn.apache.org/viewvc?rev=1094789&view=rev Log: preserve version when streaming data from old sstables patch by jbellis; reviewed by Stu Hood for CASSANDRA-2283
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1094789&r1=1094788&r2=1094789&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Apr 18 21:58:20 2011 @@ -30,6 +30,7 @@ * use 64KB flush buffer instead of in_memory_compaction_limit (CASSANDRA-2463) * fix duplicate results from CFS.scan (CASSANDRA-2406) * avoid caching token-only decoratedkeys (CASSANDRA-2416) + * preserve version when streaming data from old sstables (CASSANDRA-2283) 0.7.4 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1094789&r1=1094788&r2=1094789&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Apr 18 21:58:20 2011 @@ -657,23 +657,29 @@ public class ColumnFamilyStore implement return columnFamily; } + public String getFlushPath() + { + return getFlushPath(Descriptor.CURRENT_VERSION); + } + /* * @return a temporary file name for an sstable. * When the sstable object is closed, it will be renamed to a non-temporary * format, so incomplete sstables can be recognized and removed on startup. */ - public String getFlushPath() + public String getFlushPath(String version) { long guessedSize = 2L * memsize.value() * 1024*1024; // 2* adds room for keys, column indexes String location = DatabaseDescriptor.getDataFileLocationForTable(table.name, guessedSize); if (location == null) throw new RuntimeException("Insufficient disk space to flush"); - return getTempSSTablePath(location); + return getTempSSTablePath(location, version); } - public String getTempSSTablePath(String directory) + public String getTempSSTablePath(String directory, String version) { - Descriptor desc = new Descriptor(new File(directory), + Descriptor desc = new Descriptor(version, + new File(directory), table.name, columnFamily, fileIndexGenerator.incrementAndGet(), @@ -681,6 +687,11 @@ public class ColumnFamilyStore implement return desc.filenameFor(Component.DATA); } + public String getTempSSTablePath(String directory) + { + return getTempSSTablePath(directory, Descriptor.CURRENT_VERSION); + } + /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already. threadsafe. */ Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog) { Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1094789&r1=1094788&r2=1094789&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamIn.java Mon Apr 18 21:58:20 2011 @@ -49,11 +49,6 @@ public class StreamIn /** * Request ranges to be transferred from source to local node */ - public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges) - { - requestRanges(source, tableName, ranges, null); - } - public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, Runnable callback) { assert ranges.size() > 0; @@ -74,7 +69,7 @@ public class StreamIn // new local sstable Table table = Table.open(remotedesc.ksname); ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname); - Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath()); + Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.desc.version)); return new PendingFile(localdesc, remote); } Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?rev=1094789&r1=1094788&r2=1094789&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java (original) +++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/BootstrapTest.java Mon Apr 18 21:58:20 2011 @@ -37,6 +37,7 @@ public class BootstrapTest extends Schem public void testGetNewNames() throws IOException { Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Standard1-500-Data.db").toString()); + assert !desc.isLatestVersion; // deliberately test old version; see CASSANDRA-2283 PendingFile inContext = new PendingFile(null, desc, "Data.db", Arrays.asList(new Pair<Long,Long>(0L, 1L))); PendingFile outContext = StreamIn.getContextMapping(inContext); @@ -45,7 +46,8 @@ public class BootstrapTest extends Schem // nothing else should assertEquals(inContext.component, outContext.component); - assertEquals(inContext.desc.ksname, outContext.desc.ksname); - assertEquals(inContext.desc.cfname, outContext.desc.cfname); + assertEquals(desc.ksname, outContext.desc.ksname); + assertEquals(desc.cfname, outContext.desc.cfname); + assertEquals(desc.version, outContext.desc.version); } } Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1094789&r1=1094788&r2=1094789&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (original) +++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Mon Apr 18 21:58:20 2011 @@ -50,9 +50,9 @@ public class SerializationsTest extends private void testPendingFileWrite() throws IOException { // make sure to test serializing null and a pf with no sstable. - PendingFile normal = makePendingFile(true, "fake_component", 100); - PendingFile noSections = makePendingFile(true, "not_real", 0); - PendingFile noSST = makePendingFile(false, "also_fake", 100); + PendingFile normal = makePendingFile(true, 100); + PendingFile noSections = makePendingFile(true, 0); + PendingFile noSST = makePendingFile(false, 100); DataOutputStream out = getOutput("streaming.PendingFile.bin"); PendingFile.serializer().serialize(normal, out); @@ -78,14 +78,14 @@ public class SerializationsTest extends private void testStreamHeaderWrite() throws IOException { - StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, makePendingFile(true, "zz", 100)); - StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, makePendingFile(false, "zz", 100)); + StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, makePendingFile(true, 100)); + StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, makePendingFile(false, 100)); Collection<PendingFile> files = new ArrayList<PendingFile>(); for (int i = 0; i < 50; i++) - files.add(makePendingFile(i % 2 == 0, "aa", 100)); - StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, "bb", 100), files); + files.add(makePendingFile(i % 2 == 0, 100)); + StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100), files); StreamHeader sh3 = new StreamHeader("Keyspace1", 125L, null, files); - StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, "bb", 100), new ArrayList<PendingFile>()); + StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100), new ArrayList<PendingFile>()); DataOutputStream out = getOutput("streaming.StreamHeader.bin"); StreamHeader.serializer().serialize(sh0, out); @@ -132,13 +132,13 @@ public class SerializationsTest extends in.close(); } - private static PendingFile makePendingFile(boolean sst, String comp, int numSecs) + private static PendingFile makePendingFile(boolean sst, int numSecs) { Descriptor desc = new Descriptor("z", new File("path/doesn't/matter"), "Keyspace1", "Standard1", 23, false); List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>(); for (int i = 0; i < numSecs; i++) sections.add(new Pair<Long, Long>(new Long(i), new Long(i * i))); - return new PendingFile(sst ? makeSSTable() : null, desc, comp, sections); + return new PendingFile(sst ? makeSSTable() : null, desc, SSTable.COMPONENT_DATA, sections); } private void testStreamRequestMessageWrite() throws IOException @@ -147,8 +147,8 @@ public class SerializationsTest extends for (int i = 0; i < 5; i++) ranges.add(new Range(new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5))))); StreamRequestMessage msg0 = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", 123L); - StreamRequestMessage msg1 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, "aa", 100), 124L); - StreamRequestMessage msg2 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false, "aa", 100), 124L); + StreamRequestMessage msg1 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, 100), 124L); + StreamRequestMessage msg2 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false, 100), 124L); DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin"); StreamRequestMessage.serializer().serialize(msg0, out);