[ https://issues.apache.org/jira/browse/KAFKA-5799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389064#comment-16389064 ]
Ewen Cheslack-Postava commented on KAFKA-5799: ---------------------------------------------- Is there an actual Kafka issue here? It looks to me like this is just an issue with the interfaces/types used in Storm. Kafka allows you to deserialize to whatever types you like and the KeyValueScheme issues seem to be limitations of Storm interfaces. Perhaps this is better filed against Storm? > New KafkaSpoutConfig(Scheme)-ByteArrayKeyValueScheme > ---------------------------------------------------- > > Key: KAFKA-5799 > URL: https://issues.apache.org/jira/browse/KAFKA-5799 > Project: Kafka > Issue Type: New Feature > Affects Versions: 0.11.0.0 > Environment: apache-storm 1.1.0 > Reporter: Juhong NamGung > Priority: Minor > Attachments: 1.JPG, 2.JPG, bakvs.JPG > > > I try to integrate Kafka with Apache Strom. > I want to get data from Kafka, using KafkaSpout in Apache Storm. > To get data from Kafka using KafkaSpout, SpoutConfig-scheme must be setting. > (Scheme is an interface that dictates how the ByteBuffer consumed from Kafka > gets transformed into a storm tuple) > I want to get both key and value in Kafka, so I used to KafkaSpoutConfig > ‘KeyValueSchemeAsMultiScheme’. > KeyValueSchemeAsMultiScheme’s Constructor is as follows. > [^2.JPG] > But, as you can see in the picture, implementing classes of Interface > KeyValueScheme are only StringKeyValueScheme. > [^1.JPG] > Using StringKeyValueShceme causes problems when importing Integer data from > Kafka. Because StringKeyValueScheme deserialize Bytebuffer to String. > So I implement ByteArrayKeyValueScheme that deserialize ByteBuffer to > ByteArray. > ByteArrayKeyValueScheme imports data as BtyeArray. > If you use ByteArrayKeyValueScheme, you can import data regardless of data > type from Kafka without error. > (But, you should convert data type ByteArray to data type that you want(e.g. > String, Integer...)) > [^bakvs.JPG] > {code:java} > // Some comments here > import java.nio.ByteBuffer; > import java.util.List; > import org.apache.storm.kafka.KeyValueScheme; > import org.apache.storm.spout.RawScheme; > import org.apache.storm.tuple.Values; > import com.google.common.collect.ImmutableMap; > public class ByteArrayKeyValueScheme extends RawScheme implements > KeyValueScheme { > @Override > public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer > value) { > // TODO Auto-generated method stub > if (key == null) { > return deserialize(value); > } > Object keytuple = deserialize(key).get(0); > Object valuetuple = deserialize(value).get(0); > return new Values(ImmutableMap.of(keytuple, valuetuple)); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)