Author: jbellis Date: Thu Apr 21 16:13:11 2011 New Revision: 1095765 URL: http://svn.apache.org/viewvc?rev=1095765&view=rev Log: merge from 0.7
Modified: cassandra/branches/cassandra-0.8/ (props changed) cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/contrib/ (props changed) cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/DefsTest.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/BootstrapTest.java cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Propchange: cassandra/branches/cassandra-0.8/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 21 16:13:11 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 -/cassandra/branches/cassandra-0.7:1026516-1094195,1094604,1094611,1094647,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 +/cassandra/branches/cassandra-0.7:1026516-1094195,1094604,1094611,1094647,1094780,1094789,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 /cassandra/branches/cassandra-0.7.0:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689 /cassandra/trunk:1090978-1090979 Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1095765&r1=1095764&r2=1095765&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Thu Apr 21 16:13:11 2011 @@ -57,11 +57,15 @@ * use 64KB flush buffer instead of in_memory_compaction_limit (CASSANDRA-2463) * fix duplicate results from CFS.scan (CASSANDRA-2406) * avoid caching token-only decoratedkeys (CASSANDRA-2416) +<<<<<<< .working * count a row deletion as one operation towards memtable threshold (CASSANDRA-2519) * fixes for verifying destination availability under hinted conditions so UE can be thrown intead of timing out (CASSANDRA-2514) * support LOCAL_QUORUM, EACH_QUORUM CLs outside of NTS (CASSANDRA-2516) +======= + * preserve version when streaming data from old sstables (CASSANDRA-2283) +>>>>>>> .merge-right.r1094789 0.7.4 Propchange: cassandra/branches/cassandra-0.8/contrib/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 21 16:13:11 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009 -/cassandra/branches/cassandra-0.7/contrib:1026516-1094195,1094604,1094611,1094647,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 +/cassandra/branches/cassandra-0.7/contrib:1026516-1094195,1094604,1094611,1094647,1094780,1094789,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689 /cassandra/trunk/contrib:1090978-1090979 Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 21 16:13:11 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1094195,1094604,1094611,1094647,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1094195,1094604,1094611,1094647,1094780,1094789,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090978-1090979 Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 21 16:13:11 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1094195,1094604,1094611,1094647,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1094195,1094604,1094611,1094647,1094780,1094789,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090978-1090979 Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 21 16:13:11 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1094195,1094604,1094611,1094647,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1094195,1094604,1094611,1094647,1094780,1094789,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090978-1090979 Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 21 16:13:11 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1094195,1094604,1094611,1094647,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1094195,1094604,1094611,1094647,1094780,1094789,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090978-1090979 Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 21 16:13:11 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1094195,1094604,1094611,1094647,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1094195,1094604,1094611,1094647,1094780,1094789,1094796,1094809,1094818,1094831,1095250,1095438,1095473,1095696,1095747 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090978-1090979 Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1095765&r1=1095764&r2=1095765&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Apr 21 16:13:11 2011 @@ -590,17 +590,18 @@ public class ColumnFamilyStore implement * When the sstable object is closed, it will be renamed to a non-temporary * format, so incomplete sstables can be recognized and removed on startup. */ - public String getFlushPath(long estimatedSize) + public String getFlushPath(long estimatedSize, String version) { String location = DatabaseDescriptor.getDataFileLocationForTable(table.name, estimatedSize); if (location == null) throw new RuntimeException("Insufficient disk space to flush " + estimatedSize + " bytes"); - return getTempSSTablePath(location); + return getTempSSTablePath(location, version); } - public String getTempSSTablePath(String directory) + public String getTempSSTablePath(String directory, String version) { - Descriptor desc = new Descriptor(new File(directory), + Descriptor desc = new Descriptor(version, + new File(directory), table.name, columnFamily, fileIndexGenerator.incrementAndGet(), @@ -608,6 +609,11 @@ public class ColumnFamilyStore implement return desc.filenameFor(Component.DATA); } + public String getTempSSTablePath(String directory) + { + return getTempSSTablePath(directory, Descriptor.CURRENT_VERSION); + } + /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already. threadsafe. */ Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog) { @@ -2097,7 +2103,7 @@ public class ColumnFamilyStore implement public SSTableWriter createFlushWriter(long estimatedRows, long estimatedSize) throws IOException { - return new SSTableWriter(getFlushPath(estimatedSize), estimatedRows, metadata, partitioner); + return new SSTableWriter(getFlushPath(estimatedSize, Descriptor.CURRENT_VERSION), estimatedRows, metadata, partitioner); } public SSTableWriter createCompactionWriter(long estimatedRows, String location) throws IOException Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1095765&r1=1095764&r2=1095765&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamIn.java Thu Apr 21 16:13:11 2011 @@ -50,11 +50,6 @@ public class StreamIn /** * Request ranges to be transferred from source to local node */ - public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, OperationType type) - { - requestRanges(source, tableName, ranges, null, type); - } - public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, Runnable callback, OperationType type) { assert ranges.size() > 0; @@ -80,7 +75,7 @@ public class StreamIn // new local sstable Table table = Table.open(remotedesc.ksname); ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname); - Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.size)); + Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.size, remote.desc.version)); return new PendingFile(localdesc, remote); } Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/DefsTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/DefsTest.java?rev=1095765&r1=1095764&r2=1095765&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/DefsTest.java (original) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/DefsTest.java Thu Apr 21 16:13:11 2011 @@ -55,6 +55,7 @@ import org.apache.cassandra.db.migration import org.apache.cassandra.db.migration.UpdateColumnFamily; import org.apache.cassandra.db.migration.UpdateKeyspace; import org.apache.cassandra.io.SerDeUtils; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.locator.OldNetworkTopologyStrategy; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.net.MessagingService; @@ -319,7 +320,7 @@ public class DefsTest extends CleanupHel ColumnFamilyStore store = Table.open(cfm.ksName).getColumnFamilyStore(cfm.cfName); assert store != null; store.forceBlockingFlush(); - store.getFlushPath(1024); + store.getFlushPath(1024, Descriptor.CURRENT_VERSION); assert DefsTable.getFiles(cfm.ksName, cfm.cfName).size() > 0; new DropColumnFamily(ks.name, cfm.cfName).apply(); Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/BootstrapTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?rev=1095765&r1=1095764&r2=1095765&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/BootstrapTest.java (original) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/BootstrapTest.java Thu Apr 21 16:13:11 2011 @@ -37,6 +37,7 @@ public class BootstrapTest extends Schem public void testGetNewNames() throws IOException { Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Standard1-500-Data.db").toString()); + assert !desc.isLatestVersion; // deliberately test old version; see CASSANDRA-2283 PendingFile inContext = new PendingFile(null, desc, "Data.db", Arrays.asList(new Pair<Long,Long>(0L, 1L)), OperationType.BOOTSTRAP); PendingFile outContext = StreamIn.getContextMapping(inContext); @@ -45,7 +46,8 @@ public class BootstrapTest extends Schem // nothing else should assertEquals(inContext.component, outContext.component); - assertEquals(inContext.desc.ksname, outContext.desc.ksname); - assertEquals(inContext.desc.cfname, outContext.desc.cfname); + assertEquals(desc.ksname, outContext.desc.ksname); + assertEquals(desc.cfname, outContext.desc.cfname); + assertEquals(desc.version, outContext.desc.version); } } Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1095765&r1=1095764&r2=1095765&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (original) +++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Thu Apr 21 16:13:11 2011 @@ -50,10 +50,10 @@ public class SerializationsTest extends private void testPendingFileWrite() throws IOException { // make sure to test serializing null and a pf with no sstable. - PendingFile normal = makePendingFile(true, "fake_component", 100, OperationType.BOOTSTRAP); - PendingFile noSections = makePendingFile(true, "not_real", 0, OperationType.AES); - PendingFile noSST = makePendingFile(false, "also_fake", 100, OperationType.RESTORE_REPLICA_COUNT); - + PendingFile normal = makePendingFile(true, 100, OperationType.BOOTSTRAP); + PendingFile noSections = makePendingFile(true, 0, OperationType.AES); + PendingFile noSST = makePendingFile(false, 100, OperationType.RESTORE_REPLICA_COUNT); + DataOutputStream out = getOutput("streaming.PendingFile.bin"); PendingFile.serializer().serialize(normal, out, getVersion()); PendingFile.serializer().serialize(noSections, out, getVersion()); @@ -78,15 +78,15 @@ public class SerializationsTest extends private void testStreamHeaderWrite() throws IOException { - StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, makePendingFile(true, "zz", 100, OperationType.BOOTSTRAP)); - StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, makePendingFile(false, "zz", 100, OperationType.BOOTSTRAP)); + StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, makePendingFile(true, 100, OperationType.BOOTSTRAP)); + StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, makePendingFile(false, 100, OperationType.BOOTSTRAP)); Collection<PendingFile> files = new ArrayList<PendingFile>(); for (int i = 0; i < 50; i++) - files.add(makePendingFile(i % 2 == 0, "aa", 100, OperationType.BOOTSTRAP)); - StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, "bb", 100, OperationType.BOOTSTRAP), files); + files.add(makePendingFile(i % 2 == 0, 100, OperationType.BOOTSTRAP)); + StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100, OperationType.BOOTSTRAP), files); StreamHeader sh3 = new StreamHeader("Keyspace1", 125L, null, files); - StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, "bb", 100, OperationType.BOOTSTRAP), new ArrayList<PendingFile>()); - + StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100, OperationType.BOOTSTRAP), new ArrayList<PendingFile>()); + DataOutputStream out = getOutput("streaming.StreamHeader.bin"); StreamHeader.serializer().serialize(sh0, out, getVersion()); StreamHeader.serializer().serialize(sh1, out, getVersion()); @@ -132,13 +132,13 @@ public class SerializationsTest extends in.close(); } - private static PendingFile makePendingFile(boolean sst, String comp, int numSecs, OperationType op) + private static PendingFile makePendingFile(boolean sst, int numSecs, OperationType op) { Descriptor desc = new Descriptor("z", new File("path/doesn't/matter"), "Keyspace1", "Standard1", 23, false); List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>(); for (int i = 0; i < numSecs; i++) sections.add(new Pair<Long, Long>(new Long(i), new Long(i * i))); - return new PendingFile(sst ? makeSSTable() : null, desc, comp, sections, op); + return new PendingFile(sst ? makeSSTable() : null, desc, SSTable.COMPONENT_DATA, sections, op); } private void testStreamRequestMessageWrite() throws IOException @@ -147,9 +147,9 @@ public class SerializationsTest extends for (int i = 0; i < 5; i++) ranges.add(new Range(new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5))))); StreamRequestMessage msg0 = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, "Keyspace1", 123L, OperationType.RESTORE_REPLICA_COUNT); - StreamRequestMessage msg1 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, "aa", 100, OperationType.BOOTSTRAP), 124L); - StreamRequestMessage msg2 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false, "aa", 100, OperationType.BOOTSTRAP), 124L); - + StreamRequestMessage msg1 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), 124L); + StreamRequestMessage msg2 = new StreamRequestMessage(FBUtilities.getLocalAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), 124L); + DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin"); StreamRequestMessage.serializer().serialize(msg0, out, getVersion()); StreamRequestMessage.serializer().serialize(msg1, out, getVersion());