Updated Branches: refs/heads/cassandra-1.1 3298c2f19 -> 73d828e4e
add ConfigHelper support for Thrift frame and max message sizes patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-5188 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73d828e4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73d828e4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73d828e4 Branch: refs/heads/cassandra-1.1 Commit: 73d828e4e8023b9f7ca8fafd12becec34eb59211 Parents: 3298c2f Author: Pavel Yaskevich <xe...@apache.org> Authored: Fri Jan 25 21:49:25 2013 -0800 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Mon Jan 28 10:31:13 2013 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hadoop/ColumnFamilyOutputFormat.java | 4 +- .../cassandra/hadoop/ColumnFamilyRecordReader.java | 4 +- .../org/apache/cassandra/hadoop/ConfigHelper.java | 34 ++++++++++++++- .../apache/cassandra/thrift/ITransportFactory.java | 3 +- .../apache/cassandra/thrift/TBinaryProtocol.java | 8 +++ .../cassandra/thrift/TFramedTransportFactory.java | 7 ++- 7 files changed, 52 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1ad77b1..1c414bc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,7 @@ * fix ConcurrentModificationException in getBootstrapSource (CASSANDRA-5170) * fix sstable maxtimestamp for row deletes and pre-1.1.1 sstables (CASSANDRA-5153) * fix start key/end token validation for wide row iteration (CASSANDRA-5168) + * add ConfigHelper support for Thrift frame and max message sizes (CASSANDRA-5188) 1.1.9 http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java index e01ada5..caea616 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java @@ -154,8 +154,8 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat throws InvalidRequestException, TException, AuthenticationException, AuthorizationException, LoginException { logger.debug("Creating authenticated client for CF output format"); - TTransport transport = ConfigHelper.getOutputTransportFactory(conf).openTransport(socket); - TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport); + TTransport transport = ConfigHelper.getOutputTransportFactory(conf).openTransport(socket, conf); + TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, ConfigHelper.getThriftMaxMessageLength(conf)); Cassandra.Client client = new Cassandra.Client(binaryProtocol); client.set_keyspace(ConfigHelper.getOutputKeyspace(conf)); if (ConfigHelper.getOutputKeyspaceUserName(conf) != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index 83e436b..a40e6c5 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -161,8 +161,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap // create connection using thrift String location = getLocation(); socket = new TSocket(location, ConfigHelper.getInputRpcPort(conf)); - TTransport transport = ConfigHelper.getInputTransportFactory(conf).openTransport(socket); - TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport); + TTransport transport = ConfigHelper.getInputTransportFactory(conf).openTransport(socket, conf); + TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, ConfigHelper.getThriftMaxMessageLength(conf)); client = new Cassandra.Client(binaryProtocol); // log in http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index 4b49387..ad29903 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -76,6 +76,8 @@ public class ConfigHelper private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length"; private static final String INPUT_TRANSPORT_FACTORY_CLASS = "cassandra.input.transport.factory.class"; private static final String OUTPUT_TRANSPORT_FACTORY_CLASS = "cassandra.output.transport.factory.class"; + private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb"; + private static final String THRIFT_MAX_MESSAGE_LENGTH_IN_MB = "cassandra.thrift.message.max_size_mb"; private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class); @@ -466,6 +468,34 @@ public class ConfigHelper conf.set(OUTPUT_COMPRESSION_CHUNK_LENGTH, length); } + public static void setThriftFramedTransportSizeInMb(Configuration conf, int frameSizeInMB) + { + conf.setInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, frameSizeInMB); + } + + /** + * @param conf The configuration to use. + * @return Value (converts MBs to Bytes) set by {@link setThriftFramedTransportSizeInMb(Configuration, int)} or default of 15MB + */ + public static int getThriftFramedTransportSize(Configuration conf) + { + return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 15) * 1024 * 1024; // 15MB is default in Cassandra + } + + public static void setThriftMaxMessageLengthInMb(Configuration conf, int maxMessageSizeInMB) + { + conf.setInt(THRIFT_MAX_MESSAGE_LENGTH_IN_MB, maxMessageSizeInMB); + } + + /** + * @param conf The configuration to use. + * @return Value (converts MBs to Bytes) set by {@link setThriftMaxMessageLengthInMb(Configuration, int)} or default of 16MB + */ + public static int getThriftMaxMessageLength(Configuration conf) + { + return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 16) * 1024 * 1024; // 16MB is default in Cassandra + } + public static CompressionParameters getOutputCompressionParamaters(Configuration conf) { if (getOutputCompressionClass(conf) == null) @@ -526,8 +556,8 @@ public class ConfigHelper try { TSocket socket = new TSocket(host, port); - TTransport transport = getInputTransportFactory(conf).openTransport(socket); - return new Cassandra.Client(new TBinaryProtocol(transport)); + TTransport transport = getInputTransportFactory(conf).openTransport(socket, conf); + return new Cassandra.Client(new TBinaryProtocol(transport, getThriftMaxMessageLength(conf))); } catch (LoginException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/thrift/ITransportFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ITransportFactory.java b/src/java/org/apache/cassandra/thrift/ITransportFactory.java index 4940fc6..e3e87c2 100644 --- a/src/java/org/apache/cassandra/thrift/ITransportFactory.java +++ b/src/java/org/apache/cassandra/thrift/ITransportFactory.java @@ -21,6 +21,7 @@ package org.apache.cassandra.thrift; * */ +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; @@ -30,5 +31,5 @@ import javax.security.auth.login.LoginException; public interface ITransportFactory { - TTransport openTransport(TSocket socket) throws LoginException, TTransportException; + TTransport openTransport(TSocket socket, Configuration conf) throws LoginException, TTransportException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java b/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java index aef6c83..3d59f72 100644 --- a/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java +++ b/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java @@ -37,6 +37,14 @@ public class TBinaryProtocol extends org.apache.thrift.protocol.TBinaryProtocol this(trans, false, true); } + public TBinaryProtocol(TTransport trans, int readLength) + { + this(trans); + + if (readLength > 0) + setReadLength(readLength); + } + public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite) { super(trans); http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java index 09ae99e..792618d 100644 --- a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java +++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java @@ -21,16 +21,19 @@ package org.apache.cassandra.thrift; * */ +import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.apache.hadoop.conf.Configuration; + public class TFramedTransportFactory implements ITransportFactory { - public TTransport openTransport(TSocket socket) throws TTransportException + public TTransport openTransport(TSocket socket, Configuration conf) throws TTransportException { - TTransport transport = new TFramedTransport(socket); + TTransport transport = new TFramedTransport(socket, ConfigHelper.getThriftFramedTransportSize(conf)); transport.open(); return transport; }