Author: brandonwilliams Date: Thu Nov 17 22:17:34 2011 New Revision: 1203394
URL: http://svn.apache.org/viewvc?rev=1203394&view=rev Log: Streaming uses BroadcastAddress instead of the remote socket. Patch by Vijay, reviewed by brandonwilliams for CASSANDRA-3503 Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1203394&r1=1203393&r2=1203394&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Thu Nov 17 22:17:34 2011 @@ -19,6 +19,7 @@ package org.apache.cassandra.streaming; import java.io.*; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.util.Collections; @@ -56,8 +57,9 @@ public class IncomingStreamReader public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException { this.socket = socket; - InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress(); - session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId); + InetAddress host = header.broadcastAddress != null ? header.broadcastAddress + : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress(); + session = StreamInSession.get(host, header.sessionId); session.addFiles(header.pendingFiles); // set the current file we are streaming so progress shows up in jmx session.setCurrentFile(header.file); Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java?rev=1203394&r1=1203393&r2=1203394&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java Thu Nov 17 22:17:34 2011 @@ -22,12 +22,16 @@ package org.apache.cassandra.streaming; */ import java.io.*; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; public class StreamHeader { @@ -54,6 +58,9 @@ public class StreamHeader /** files to add to the session */ public final Collection<PendingFile> pendingFiles; + /** Address of the sender **/ + public final InetAddress broadcastAddress; + public StreamHeader(String table, long sessionId, PendingFile file) { this(table, sessionId, file, Collections.<PendingFile>emptyList()); @@ -61,10 +68,16 @@ public class StreamHeader public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile> pendingFiles) { + this(table, sessionId, first, pendingFiles, FBUtilities.getBroadcastAddress()); + } + + public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile> pendingFiles, InetAddress broadcastAddress) + { this.table = table; this.sessionId = sessionId; this.file = first; this.pendingFiles = pendingFiles; + this.broadcastAddress = broadcastAddress; } private static class StreamHeaderSerializer implements IVersionedSerializer<StreamHeader> @@ -79,6 +92,7 @@ public class StreamHeader { PendingFile.serializer().serialize(file, dos, version); } + CompactEndpointSerializationHelper.serialize(sh.broadcastAddress, dos); } public StreamHeader deserialize(DataInput dis, int version) throws IOException @@ -93,8 +107,10 @@ public class StreamHeader { pendingFiles.add(PendingFile.serializer().deserialize(dis, version)); } - - return new StreamHeader(table, sessionId, file, pendingFiles); + InetAddress bca = null; + if (version > MessagingService.VERSION_10) + bca = CompactEndpointSerializationHelper.deserialize(dis); + return new StreamHeader(table, sessionId, file, pendingFiles, bca); } public long serializedSize(StreamHeader streamHeader, int version)