Updated Branches: refs/heads/0.8 61b8b2bf8 -> bb7b45cd5
kafka-946; Kafka Hadoop Consumer fails when verifying message checksum; patched by Sam Meder; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb7b45cd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb7b45cd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb7b45cd Branch: refs/heads/0.8 Commit: bb7b45cd5d454d64c3454b01f8f3f1e13ed26ff3 Parents: 61b8b2b Author: Sam Meder <[email protected]> Authored: Tue Sep 24 08:31:18 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Tue Sep 24 08:31:18 2013 -0700 ---------------------------------------------------------------------- .../src/main/java/kafka/etl/KafkaETLContext.java | 2 +- .../src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bb7b45cd/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index 8e98efc..1d0e0a9 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -196,7 +196,7 @@ public class KafkaETLContext { if (_messageIt != null && _messageIt.hasNext()) { MessageAndOffset messageAndOffset = _messageIt.next(); - ByteBuffer buf = messageAndOffset.message().payload(); + ByteBuffer buf = messageAndOffset.message().buffer(); int origSize = buf.remaining(); byte[] bytes = new byte[origSize]; buf.get(bytes, buf.position(), origSize); http://git-wip-us.apache.org/repos/asf/kafka/blob/bb7b45cd/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java index b0aadff..45cc921 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java @@ -16,8 +16,6 @@ */ package kafka.etl.impl; -import java.io.IOException; -import java.nio.ByteBuffer; import kafka.etl.KafkaETLKey; import kafka.etl.KafkaETLUtils; import kafka.message.Message; @@ -29,6 +27,9 @@ import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import java.io.IOException; +import java.nio.ByteBuffer; + /** * Simple implementation of KafkaETLMapper. It assumes that * input data are text timestamp (long). @@ -59,7 +60,7 @@ Mapper<KafkaETLKey, BytesWritable, LongWritable, Text> { byte[] bytes = KafkaETLUtils.getBytes(val); //check the checksum of message - Message message = new Message(bytes); + Message message = new Message(ByteBuffer.wrap(bytes)); long checksum = key.getChecksum(); if (checksum != message.checksum()) throw new IOException ("Invalid message checksum "
