[ 
https://issues.apache.org/jira/browse/KAFKA-946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13706465#comment-13706465
 ] 

Jay Kreps commented on KAFKA-946:
---------------------------------

>From Richard:

Sorry, It took a while to remember the context.
In a Kafka Message, the checksum is created on the whole message: header and 
payload included.

The contrib code passes only the Message payload to the mapper, and not the 
whole buffer. I believe the reason for this is that we wanted to pass just the 
message data (not any of the kafka special bits) for the mapper handle. The 
Message that is created in the SimpleKafkaETLMapper is then creating using the 
incorrect payload bytes. It can be argued that this is desirable. For instance, 
Mappers can decode the byte buffer directly into Avro without stripping away 
the header or dealing with kafka Messages at all.

Also, changing the KafakETLContext code could be affect a lot of users. This is 
definitely not a backwards compatible change. It can also be argued that the 
BytesWriteable only contains the payload code, and that checksum-ing of the 
message should've occurred well before the Mapper gets the message.

However, I think that Sam's fix still has merit. It would be good for the 
KafkaETLContext to pass the Message buffer instead of the payload and the 
RecordReader could strip away the kafka bits before giving the payload to the 
Mapper. Perhaps put in a config switch to either get just payload or the whole 
kafka message buffer?

Additional thoughts:
I assume there are plenty of users of this code. If there's anyone who uses the 
KafkaETLContext directly, they'll find the patch's changes to break their 
stuff. However, for those who are using KafkaETLContext through the 
KafkaETLRecordReader (as they should), then there is a way to make it backwards 
compatible.

The checksumming and payload stripping code could go into the RecordReader 
rather than the KafkaETLContext.

If the scope of these changes are too big, I'd just fix the 
SimpleKafkaETLMapper to not parse the payload bytes.
                
> Kafka Hadoop Consumer fails when verifying message checksum
> -----------------------------------------------------------
>
>                 Key: KAFKA-946
>                 URL: https://issues.apache.org/jira/browse/KAFKA-946
>             Project: Kafka
>          Issue Type: Bug
>          Components: contrib
>    Affects Versions: 0.8
>            Reporter: Sam Meder
>            Priority: Critical
>         Attachments: hadoop_consumer.patch
>
>
> The code tries to verify the checksum, but fails because the data available 
> isn't the same. In KafkaETLContext:
>     protected boolean get(KafkaETLKey key, BytesWritable value) throws 
> IOException {
>       if (_messageIt != null && _messageIt.hasNext()) {
>             MessageAndOffset messageAndOffset = _messageIt.next();
>             ByteBuffer buf = messageAndOffset.message().payload();
>             int origSize = buf.remaining();
>             byte[] bytes = new byte[origSize];
>           buf.get(bytes, buf.position(), origSize);
>             value.set(bytes, 0, origSize);
>             key.set(_index, _offset, messageAndOffset.message().checksum());
>             _offset = messageAndOffset.nextOffset();  //increase offset       
>                                                                               
>                                              
>             _count ++;  //increase count                                      
>                                                                               
>                                              
>             return true;
>         }
>         else return false;
>     }
> Note that the message payload is used and the message checksum is included in 
> the key. The in SimpleKafkaETLMapper:
>     @Override
>     public void map(KafkaETLKey key, BytesWritable val,
>             OutputCollector<LongWritable, Text> collector,
>             Reporter reporter) throws IOException {
>       byte[] bytes = KafkaETLUtils.getBytes(val);
>         //check the checksum of message                                       
>                                                                               
>                                              
>         Message message = new Message(bytes);
>         long checksum = key.getChecksum();
>       if (checksum != message.checksum())
>             throw new IOException ("Invalid message checksum "
>                                             + message.checksum() + ". 
> Expected " + key + ".");
> the Message object is initialized with the payload bytes and a new checksum 
> is calculated. The problem is that the original message checksum also 
> contains the key so checksum verification fails...

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to