Author: brandonwilliams Date: Wed Nov 30 16:38:36 2011 New Revision: 1208499
URL: http://svn.apache.org/viewvc?rev=1208499&view=rev Log: Bulk loader is no longer a fat client, hadoop bulk loader output format. Patch by brandonwilliams, reviewed by Yuki Morishita for CASSANDRA-3045 Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java (with props) cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java (with props) Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/config/Config.java cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java cassandra/trunk/src/java/org/apache/cassandra/net/Header.java cassandra/trunk/src/java/org/apache/cassandra/net/Message.java cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Wed Nov 30 16:38:36 2011 @@ -9,6 +9,8 @@ (CASSANDRA-3116) * recognize that "SELECT first ... *" isn't really "SELECT *" (CASSANDRA-3445) * Use faster bytes comparison (CASSANDRA-3434) + * Bulk loader is no longer a fat client, (HADOOP) bulk load output format + (CASSANDRA-3045) 1.0.5 Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Nov 30 16:38:36 2011 @@ -109,7 +109,7 @@ public class Config public RequestSchedulerId request_scheduler_id; public RequestSchedulerOptions request_scheduler_options; - public EncryptionOptions encryption_options; + public EncryptionOptions encryption_options = new EncryptionOptions(); public Integer index_interval = 128; Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Nov 30 16:38:36 2011 @@ -107,30 +107,38 @@ public class DatabaseDescriptor return url; } + + public static void initDefaultsOnly() + { + conf = new Config(); + } static { try { - URL url = getStorageConfigURL(); - logger.info("Loading settings from " + url); - - InputStream input = null; - try - { - input = url.openStream(); - } - catch (IOException e) + // only load yaml if conf wasn't already set + if (conf == null) { - // getStorageConfigURL should have ruled this out - throw new AssertionError(e); + URL url = getStorageConfigURL(); + logger.info("Loading settings from " + url); + InputStream input = null; + try + { + input = url.openStream(); + } + catch (IOException e) + { + // getStorageConfigURL should have ruled this out + throw new AssertionError(e); + } + org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class); + TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class); + seedDesc.putMapPropertyType("parameters", String.class, String.class); + constructor.addTypeDescription(seedDesc); + Yaml yaml = new Yaml(new Loader(constructor)); + conf = (Config)yaml.load(input); } - org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class); - TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class); - seedDesc.putMapPropertyType("parameters", String.class, String.class); - constructor.addTypeDescription(seedDesc); - Yaml yaml = new Yaml(new Loader(constructor)); - conf = (Config)yaml.load(input); if (conf.commitlog_sync == null) { Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java?rev=1208499&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java Wed Nov 30 16:38:36 2011 @@ -0,0 +1,99 @@ +package org.apache.cassandra.hadoop; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.thrift.Mutation; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.*; + +public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>> + implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>> +{ + private static final Logger logger = LoggerFactory.getLogger(BulkOutputFormat.class); + + @Override + public void checkOutputSpecs(JobContext context) + { + checkOutputSpecs(context.getConfiguration()); + } + + private void checkOutputSpecs(Configuration conf) + { + if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null) + { + throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()"); + } + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException + { + return new NullOutputCommitter(); + } + + /** Fills the deprecated OutputFormat interface for streaming. */ + @Deprecated + public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException + { + checkOutputSpecs(job); + } + + /** Fills the deprecated OutputFormat interface for streaming. */ + @Deprecated + public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException + { + return new BulkRecordWriter(job); + } + + @Override + public BulkRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException + { + return new BulkRecordWriter(context); + } + + public static class NullOutputCommitter extends OutputCommitter + { + public void abortTask(TaskAttemptContext taskContext) { } + + public void cleanupJob(JobContext jobContext) { } + + public void commitTask(TaskAttemptContext taskContext) { } + + public boolean needsTaskCommit(TaskAttemptContext taskContext) + { + return false; + } + + public void setupJob(JobContext jobContext) { } + + public void setupTask(TaskAttemptContext taskContext) { } + } +} Propchange: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java ------------------------------------------------------------------------------ svn:eol-style = native Added: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java?rev=1208499&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java Wed Nov 30 16:38:36 2011 @@ -0,0 +1,232 @@ +package org.apache.cassandra.hadoop; + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.net.UnknownHostException; +import java.util.*; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter; +import org.apache.cassandra.thrift.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.thrift.protocol.*; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + + +final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>> +implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> +{ + private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; + private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; + private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; + private final static String IS_SUPERCF = "mapreduce.output.bulkoutputformat.issuper"; + private final Configuration conf; + private boolean isSuper = false; + private SSTableSimpleUnsortedWriter writer; + private SSTableLoader loader; + + static { + DatabaseDescriptor.initDefaultsOnly(); // make sure DD doesn't load yaml + } + + BulkRecordWriter(TaskAttemptContext context) throws IOException + { + this(context.getConfiguration()); + } + + BulkRecordWriter(Configuration conf) throws IOException + { + this.conf = conf; + DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.valueOf(conf.get(STREAM_THROTTLE_MBITS, "0"))); + String keyspace = ConfigHelper.getOutputKeyspace(conf); + File outputdir = new File(getOutputLocation() + File.separator + keyspace); //dir must be named by ks for the loader + outputdir.mkdirs(); + this.isSuper = Boolean.valueOf(conf.get(IS_SUPERCF)); + AbstractType subcomparator = null; + if (isSuper) + subcomparator = BytesType.instance; + this.writer = new SSTableSimpleUnsortedWriter( + outputdir, + keyspace, + ConfigHelper.getOutputColumnFamily(conf), + BytesType.instance, + subcomparator, + Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64"))); + this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf)), new NullOutputHandler()); + } + + private String getOutputLocation() throws IOException + { + String dir = conf.get(OUTPUT_LOCATION, conf.get("mapred.local.dir")); + if (dir == null) + throw new IOException("Output directory not defined, if hadoop is not setting mapred.local.dir then define " + OUTPUT_LOCATION); + return dir; + } + + + @Override + public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException + { + writer.newRow(keybuff); + for (Mutation mut : value) + { + if (isSuper) + { + writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name); + for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns) + writer.addColumn(column.name, column.value, column.timestamp); + } + else + writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp); + } + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException + { + close(); + } + + /** Fills the deprecated RecordWriter interface for streaming. */ + @Deprecated + public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException + { + close(); + } + + private void close() throws IOException + { + writer.close(); + try + { + loader.stream().get(); + } + catch (InterruptedException e) + { + throw new IOException(e); + } + } + + static class ExternalClient extends SSTableLoader.Client + { + private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>(); + private String hostlist; + private int rpcPort; + + public ExternalClient(String hostlist, int port) + { + super(); + this.hostlist = hostlist; + this.rpcPort = port; + } + + public void init(String keyspace) + { + Set<InetAddress> hosts = new HashSet<InetAddress>(); + String[] nodes = hostlist.split(","); + for (String node : nodes) + { + try + { + hosts.add(InetAddress.getByName(node)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + Iterator<InetAddress> hostiter = hosts.iterator(); + while (hostiter.hasNext()) + { + try + { + InetAddress host = hostiter.next(); + Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort); + List<TokenRange> tokenRanges = client.describe_ring(keyspace); + List<KsDef> ksDefs = client.describe_keyspaces(); + + setPartitioner(client.describe_partitioner()); + Token.TokenFactory tkFactory = getPartitioner().getTokenFactory(); + + for (TokenRange tr : tokenRanges) + { + Range range = new Range(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token)); + for (String ep : tr.endpoints) + { + addRangeForEndpoint(range, InetAddress.getByName(ep)); + } + } + + for (KsDef ksDef : ksDefs) + { + Set<String> cfs = new HashSet<String>(); + for (CfDef cfDef : ksDef.cf_defs) + cfs.add(cfDef.name); + knownCfs.put(ksDef.name, cfs); + } + break; + } + catch (Exception e) + { + if (!hostiter.hasNext()) + throw new RuntimeException("Could not retrieve endpoint ranges: ", e); + } + } + } + + public boolean validateColumnFamily(String keyspace, String cfName) + { + Set<String> cfs = knownCfs.get(keyspace); + return cfs != null && cfs.contains(cfName); + } + + private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException + { + TSocket socket = new TSocket(host, port); + TTransport trans = new TFramedTransport(socket); + trans.open(); + TProtocol protocol = new org.apache.thrift.protocol.TBinaryProtocol(trans); + return new Cassandra.Client(protocol); + } + } + + static class NullOutputHandler implements SSTableLoader.OutputHandler + { + public void output(String msg) {} + + public void debug(String msg) {} + } +} Propchange: cassandra/trunk/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java Wed Nov 30 16:38:36 2011 @@ -27,9 +27,11 @@ import java.util.concurrent.CountDownLat import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.*; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; /** @@ -84,7 +86,7 @@ public class SSTableLoader try { - sstables.add(SSTableReader.open(desc, components, null, StorageService.getPartitioner())); + sstables.add(SSTableReader.open(desc, components, null, client.getPartitioner())); } catch (IOException e) { @@ -227,15 +229,15 @@ public class SSTableLoader public static abstract class Client { private final Map<InetAddress, Collection<Range>> endpointToRanges = new HashMap<InetAddress, Collection<Range>>(); + private IPartitioner partitioner; /** * Initialize the client. * Perform any step necessary so that after the call to the this * method: - * * StorageService is correctly initialized (so that gossip and - * messaging service is too) + * * partitioner is initialized * * getEndpointToRangesMap() returns a correct map - * This method is guaranted to be called before any other method of a + * This method is guaranteed to be called before any other method of a * client. */ public abstract void init(String keyspace); @@ -256,6 +258,16 @@ public class SSTableLoader return endpointToRanges; } + protected void setPartitioner(String partclass) throws ConfigurationException + { + this.partitioner = FBUtilities.newPartitioner(partclass); + } + + public IPartitioner getPartitioner() + { + return partitioner; + } + protected void addRangeForEndpoint(Range range, InetAddress endpoint) { Collection<Range> ranges = endpointToRanges.get(endpoint); Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Wed Nov 30 16:38:36 2011 @@ -40,7 +40,7 @@ public class Header serializer_ = new HeaderSerializer(); } - static IVersionedSerializer<Header> serializer() + public static IVersionedSerializer<Header> serializer() { return serializer_; } Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Wed Nov 30 16:38:36 2011 @@ -30,7 +30,7 @@ public class Message private final byte[] body_; private final transient int version; - Message(Header header, byte[] body, int version) + public Message(Header header, byte[] body, int version) { assert header != null; assert body != null; Modified: cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Nov 30 16:38:36 2011 @@ -162,7 +162,7 @@ public class OutboundTcpConnection exten } } - static void write(Message message, String id, DataOutputStream out) throws IOException + public static void write(Message message, String id, DataOutputStream out) throws IOException { /* Setting up the protocol header. This is 4 bytes long Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Wed Nov 30 16:38:36 2011 @@ -18,9 +18,7 @@ package org.apache.cassandra.streaming; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; +import java.io.*; import java.net.InetAddress; import java.net.Socket; import java.nio.ByteBuffer; @@ -30,7 +28,11 @@ import org.apache.cassandra.gms.Gossiper import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.net.Header; +import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.OutboundTcpConnection; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.Throttle; @@ -54,12 +56,15 @@ public class FileStreamTask extends Wrap // communication socket private Socket socket; - // socket's output stream + // socket's output/input stream private OutputStream output; + private OutputStream compressedoutput; + private DataInputStream input; // allocate buffer to use for transfers only once private final byte[] transferBuffer = new byte[CHUNK_SIZE]; // outbound global throughput limiter private final Throttle throttle; + private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler(); public FileStreamTask(StreamHeader header, InetAddress to) { @@ -89,6 +94,12 @@ public class FileStreamTask extends Wrap // successfully connected: stream. // (at this point, if we fail, it is the receiver's job to re-request) stream(); + if (StreamOutSession.get(to, header.sessionId).getFiles().size() == 0) + { + // we are the last of our kind, receive the final confirmation before closing + receiveReply(); + logger.info("Finished streaming session to {}", to); + } } finally { @@ -125,7 +136,7 @@ public class FileStreamTask extends Wrap : RandomAccessReader.open(new File(header.file.getFilename()), true); // setting up data compression stream - output = new LZFOutputStream(output); + compressedoutput = new LZFOutputStream(output); try { @@ -149,11 +160,13 @@ public class FileStreamTask extends Wrap } // make sure that current section is send - output.flush(); + compressedoutput.flush(); if (logger.isDebugEnabled()) logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size); } + // receive reply confirmation + receiveReply(); } finally { @@ -162,6 +175,25 @@ public class FileStreamTask extends Wrap } } + private void receiveReply() throws IOException + { + MessagingService.validateMagic(input.readInt()); + int msheader = input.readInt(); + assert MessagingService.getBits(msheader, 3, 1) == 0 : "Stream received before stream reply"; + int version = MessagingService.getBits(msheader, 15, 8); + + int totalSize = input.readInt(); + String id = input.readUTF(); + Header header = Header.serializer().deserialize(input, version); + + int bodySize = input.readInt(); + byte[] body = new byte[bodySize]; + input.readFully(body); + Message message = new Message(header, body, version); + assert message.getVerb() == StorageService.Verb.STREAM_REPLY : "Non-reply message received on stream socket"; + handler.doVerb(message, id); + } + /** * Sequentially read bytes from the file and write them to the output stream * @@ -178,7 +210,7 @@ public class FileStreamTask extends Wrap int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred); reader.readFully(transferBuffer, 0, toTransfer); - output.write(transferBuffer, 0, toTransfer); + compressedoutput.write(transferBuffer, 0, toTransfer); throttle.throttleDelta(toTransfer); return toTransfer; @@ -198,6 +230,7 @@ public class FileStreamTask extends Wrap { socket = MessagingService.instance().getConnectionPool(to).newSocket(); output = socket.getOutputStream(); + input = new DataInputStream(socket.getInputStream()); break; } catch (IOException e) Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Nov 30 16:38:36 2011 @@ -60,6 +60,8 @@ public class IncomingStreamReader InetAddress host = header.broadcastAddress != null ? header.broadcastAddress : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress(); session = StreamInSession.get(host, header.sessionId); + session.setSocket(socket); + session.addFiles(header.pendingFiles); // set the current file we are streaming so progress shows up in jmx session.setCurrentFile(header.file); @@ -88,18 +90,13 @@ public class IncomingStreamReader try { reader = streamIn(dis, localFile, remoteFile); + session.finished(remoteFile, reader); } catch (IOException ex) { retry(); throw ex; } - finally - { - dis.close(); - } - - session.finished(remoteFile, reader); } session.closeIfFinished(); @@ -169,6 +166,7 @@ public class IncomingStreamReader session.retry(remoteFile); /* Delete the orphaned file. */ - FileUtils.deleteWithConfirm(new File(localFile.getFilename())); + if (new File(localFile.getFilename()).isFile()) + FileUtils.deleteWithConfirm(new File(localFile.getFilename())); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Nov 30 16:38:36 2011 @@ -18,8 +18,10 @@ package org.apache.cassandra.streaming; +import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; +import java.net.Socket; import java.util.*; import java.util.concurrent.ConcurrentMap; @@ -27,7 +29,8 @@ import org.apache.cassandra.db.ColumnFam import org.apache.cassandra.db.Table; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.OutboundTcpConnection; import org.apache.cassandra.utils.Pair; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -49,6 +52,7 @@ public class StreamInSession private String table; private final List<SSTableReader> readers = new ArrayList<SSTableReader>(); private PendingFile current; + private Socket socket; private StreamInSession(Pair<InetAddress, Long> context, Runnable callback) { @@ -89,6 +93,11 @@ public class StreamInSession this.table = table; } + public void setSocket(Socket socket) + { + this.socket = socket; + } + public void addFiles(Collection<PendingFile> files) { for (PendingFile file : files) @@ -111,16 +120,23 @@ public class StreamInSession current = null; StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_FINISHED); // send a StreamStatus message telling the source node it can delete this file - MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost()); + sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost()))); + logger.debug("ack {} sent for {}", reply, remoteFile); } public void retry(PendingFile remoteFile) throws IOException { StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY); logger.info("Streaming of file {} from {} failed: requesting a retry.", remoteFile, this); - MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost()); + sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost()))); } + public void sendMessage(Message message) throws IOException + { + OutboundTcpConnection.write(message, String.valueOf(getSessionId()), new DataOutputStream(socket.getOutputStream())); + } + + public void closeIfFinished() throws IOException { if (files.isEmpty()) @@ -160,7 +176,14 @@ public class StreamInSession // send reply to source that we're done StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED); logger.info("Finished streaming session {} from {}", getSessionId(), getHost()); - MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost()); + try + { + OutboundTcpConnection.write(reply.getMessage(Gossiper.instance.getVersion(getHost())), context.right.toString(), new DataOutputStream(socket.getOutputStream())); + } + finally + { + socket.close(); + } if (callback != null) callback.run(); Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java Wed Nov 30 16:38:36 2011 @@ -55,6 +55,7 @@ public class StreamReplyVerbHandler impl switch (reply.action) { case FILE_FINISHED: + logger.info("Successfully sent {} to {}", reply.file, message.getFrom()); session.validateCurrentFile(reply.file); session.startNext(); break; Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java?rev=1208499&r1=1208498&r2=1208499&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/BulkLoader.java Wed Nov 30 16:38:36 2011 @@ -28,9 +28,7 @@ import org.apache.cassandra.config.CFMet import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.SSTableLoader; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.PendingFile; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.FBUtilities; @@ -52,13 +50,15 @@ public class BulkLoader private static final String HELP_OPTION = "help"; private static final String NOPROGRESS_OPTION = "no-progress"; private static final String IGNORE_NODES_OPTION = "ignore"; + private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes"; + private static final String RPC_PORT_OPTION = "port"; public static void main(String args[]) throws IOException { LoaderOptions options = LoaderOptions.parseArgs(args); try { - SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options), options); + SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options, options.hosts, options.rpcPort), options); SSTableLoader.LoaderFuture future = loader.stream(options.ignores); if (options.noProgress) @@ -171,38 +171,34 @@ public class BulkLoader { private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>(); private final SSTableLoader.OutputHandler outputHandler; + private Set<InetAddress> hosts = new HashSet<InetAddress>(); + private int rpcPort; - public ExternalClient(SSTableLoader.OutputHandler outputHandler) + public ExternalClient(SSTableLoader.OutputHandler outputHandler, Set<InetAddress> hosts, int port) { super(); this.outputHandler = outputHandler; + this.hosts = hosts; + this.rpcPort = port; } public void init(String keyspace) { - outputHandler.output(String.format("Starting client (and waiting %d seconds for gossip) ...", StorageService.RING_DELAY / 1000)); - try + Iterator<InetAddress> hostiter = hosts.iterator(); + while (hostiter.hasNext()) { - // Init gossip - StorageService.instance.initClient(); + try + { - Set<InetAddress> hosts = Gossiper.instance.getLiveMembers(); - hosts.remove(FBUtilities.getBroadcastAddress()); - if (hosts.isEmpty()) - throw new IllegalStateException("Cannot load any sstable, no live member found in the cluster"); - - // Query endpoint to ranges map and schemas from thrift - String host = hosts.iterator().next().toString().substring(1); - int port = DatabaseDescriptor.getRpcPort(); - - Cassandra.Client client = createThriftClient(host, port); - List<TokenRange> tokenRanges = client.describe_ring(keyspace); - List<KsDef> ksDefs = client.describe_keyspaces(); + // Query endpoint to ranges map and schemas from thrift + InetAddress host = hostiter.next(); + Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort); + List<TokenRange> tokenRanges = client.describe_ring(keyspace); + List<KsDef> ksDefs = client.describe_keyspaces(); - Token.TokenFactory tkFactory = StorageService.getPartitioner().getTokenFactory(); + setPartitioner(client.describe_partitioner()); + Token.TokenFactory tkFactory = getPartitioner().getTokenFactory(); - try - { for (TokenRange tr : tokenRanges) { Range range = new Range(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token)); @@ -211,30 +207,22 @@ public class BulkLoader addRangeForEndpoint(range, InetAddress.getByName(ep)); } } - } - catch (UnknownHostException e) - { - throw new RuntimeException("Got an unknow host from describe_ring()", e); - } - for (KsDef ksDef : ksDefs) + for (KsDef ksDef : ksDefs) + { + Set<String> cfs = new HashSet<String>(); + for (CfDef cfDef : ksDef.cf_defs) + cfs.add(cfDef.name); + knownCfs.put(ksDef.name, cfs); + } + break; + } + catch (Exception e) { - Set<String> cfs = new HashSet<String>(); - for (CfDef cfDef : ksDef.cf_defs) - cfs.add(cfDef.name); - knownCfs.put(ksDef.name, cfs); + if (!hostiter.hasNext()) + throw new RuntimeException("Could not retrieve endpoint ranges: ", e); } } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - @Override - public void stop() - { - StorageService.instance.stopClient(); } public boolean validateColumnFamily(String keyspace, String cfName) @@ -260,7 +248,9 @@ public class BulkLoader public boolean debug; public boolean verbose; public boolean noProgress; + public int rpcPort = 9160; + public Set<InetAddress> hosts = new HashSet<InetAddress>(); public Set<InetAddress> ignores = new HashSet<InetAddress>(); LoaderOptions(File directory) @@ -312,6 +302,32 @@ public class BulkLoader opts.verbose = cmd.hasOption(VERBOSE_OPTION); opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION); + if (cmd.hasOption(RPC_PORT_OPTION)) + opts.rpcPort = Integer.valueOf(cmd.getOptionValue(RPC_PORT_OPTION)); + + if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION)) + { + String[] nodes = cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(","); + try + { + for (String node : nodes) + { + opts.hosts.add(InetAddress.getByName(node.trim())); + } + } + catch (UnknownHostException e) + { + errorMsg("Unknown host: " + e.getMessage(), options); + } + + } + else + { + System.err.println("Initial hosts must be specified (-d)"); + printUsage(options); + System.exit(1); + } + if (cmd.hasOption(IGNORE_NODES_OPTION)) { String[] nodes = cmd.getOptionValue(IGNORE_NODES_OPTION).split(","); @@ -363,6 +379,8 @@ public class BulkLoader options.addOption("h", HELP_OPTION, "display this help message"); options.addOption(null, NOPROGRESS_OPTION, "don't display progress"); options.addOption("i", IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes"); + options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "try to connect to these hosts (comma separated) initially for ring information"); + options.addOption("p", RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)"); return options; }