[ 
https://issues.apache.org/jira/browse/CASSANDRA-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13646434#comment-13646434
 ] 

Rob Timpe commented on CASSANDRA-5529:
--------------------------------------

Thanks for the patch and the quick turnaround.  Verified on the 1.1 branch that 
it fixes my problem.

I'm not really familiar with this api, hence my notes about TBinaryProtocol.  
You solution makes way more sense :)
                
> ColumnFamilyRecordReader fails for large datasets
> -------------------------------------------------
>
>                 Key: CASSANDRA-5529
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-5529
>             Project: Cassandra
>          Issue Type: Bug
>          Components: API, Hadoop
>    Affects Versions: 0.6
>            Reporter: Rob Timpe
>            Assignee: Jonathan Ellis
>             Fix For: 1.1.12, 1.2.5
>
>         Attachments: 5529-1.1.txt, 5529.txt
>
>
> When running mapreduce jobs that read directly from cassandra, the job will 
> sometimes fail with an exception like this:
> java.lang.RuntimeException: com.rockmelt.org.apache.thrift.TException: 
> Message length exceeded: 40
>       at 
> org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.maybeInit(ColumnFamilyRecordReader.java:400)
>       at 
> org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.computeNext(ColumnFamilyRecordReader.java:406)
>       at 
> org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.computeNext(ColumnFamilyRecordReader.java:329)
>       at 
> com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
>       at 
> com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
>       at 
> org.apache.cassandra.hadoop.ColumnFamilyRecordReader.getProgress(ColumnFamilyRecordReader.java:109)
>       at 
> org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.getProgress(MapTask.java:522)
>       at 
> org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:547)
>       at 
> org.apache.hadoop.mapreduce.MapContext.nextKeyValue(MapContext.java:67)
>       at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
>       at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:771)
>       at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>       at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:396)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
>       at org.apache.hadoop.mapred.Child.main(Child.java:249)
> Caused by: com.rockmelt.org.apache.thrift.TException: Message length 
> exceeded: 40
>       at 
> com.rockmelt.org.apache.thrift.protocol.TBinaryProtocol.checkReadLength(TBinaryProtocol.java:393)
>       at 
> com.rockmelt.org.apache.thrift.protocol.TBinaryProtocol.readBinary(TBinaryProtocol.java:363)
>       at org.apache.cassandra.thrift.Column.read(Column.java:528)
>       at 
> org.apache.cassandra.thrift.ColumnOrSuperColumn.read(ColumnOrSuperColumn.java:507)
>       at org.apache.cassandra.thrift.KeySlice.read(KeySlice.java:408)
>       at 
> org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:12422)
>       at 
> com.rockmelt.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
>       at 
> org.apache.cassandra.thrift.Cassandra$Client.recv_get_range_slices(Cassandra.java:696)
>       at 
> org.apache.cassandra.thrift.Cassandra$Client.get_range_slices(Cassandra.java:680)
>       at 
> org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.maybeInit(ColumnFamilyRecordReader.java:362)
>       ... 16 more
> In ColumnFamilyRecordReader#initialize, a TBinaryProtocol is created as 
> follows:
> TTransport transport = 
> ConfigHelper.getInputTransportFactory(conf).openTransport(socket, conf);
> TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, 
> ConfigHelper.getThriftMaxMessageLength(conf));
> client = new Cassandra.Client(binaryProtocol);
> But each time a call to cassandra is made, checkReadLength(int length) is 
> called in TBinaryProtocol, which includes this:
> readLength_ -= length;
> if (readLength_ < 0) {
>    throw new TException("Message length exceeded: " + length);
> }
> The result is that readLength_ is decreased each time, until it goes negative 
> and exception is thrown.  This will only happen if you're reading a lot of 
> data and your split size is large (which is maybe why people haven't noticed 
> it earlier).  This happens regardless of whether you use wide row support.
> I'm not sure what the right fix is.  It seems like you could either reset the 
> length of TBinaryProtocol after each call or just use a new TBinaryProtocol 
> each time.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to