[ 
https://issues.apache.org/jira/browse/KAFKA-5799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5799.
------------------------------
    Resolution: Auto Closed

Closing Apache Storm - Kafka Spout related query.  If this still issue, please 
contact storm mailing list.

> New KafkaSpoutConfig(Scheme)-ByteArrayKeyValueScheme
> ----------------------------------------------------
>
>                 Key: KAFKA-5799
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5799
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.11.0.0
>         Environment: apache-storm 1.1.0
>            Reporter: Juhong NamGung
>            Priority: Minor
>         Attachments: 1.JPG, 2.JPG, bakvs.JPG
>
>
> I try to integrate Kafka with Apache Strom.
> I want to get data from Kafka, using KafkaSpout in Apache Storm. 
> To get data from Kafka using KafkaSpout, SpoutConfig-scheme must be setting. 
> (Scheme is an interface that dictates how the ByteBuffer consumed from Kafka 
> gets transformed into a storm tuple)
> I want to get both key and value in Kafka, so I used to KafkaSpoutConfig 
> ‘KeyValueSchemeAsMultiScheme’.
> KeyValueSchemeAsMultiScheme’s Constructor is as follows.
> [^2.JPG]
> But, as you can see in the picture, implementing classes of Interface 
> KeyValueScheme are only StringKeyValueScheme.
> [^1.JPG]
> Using StringKeyValueShceme causes problems when importing Integer data from 
> Kafka. Because StringKeyValueScheme deserialize Bytebuffer to String.
> So I implement ByteArrayKeyValueScheme that deserialize ByteBuffer to 
> ByteArray.
> ByteArrayKeyValueScheme imports data as BtyeArray.
> If you use ByteArrayKeyValueScheme, you can import data regardless of data 
> type from Kafka without error.
> (But, you should convert data type ByteArray to data type that you want(e.g. 
> String, Integer...))
> [^bakvs.JPG]
> {code:java}
> // Some comments here
> import java.nio.ByteBuffer;
> import java.util.List;
> import org.apache.storm.kafka.KeyValueScheme;
> import org.apache.storm.spout.RawScheme;
> import org.apache.storm.tuple.Values;
> import com.google.common.collect.ImmutableMap;
> public class ByteArrayKeyValueScheme extends RawScheme implements 
> KeyValueScheme {
>       @Override
>       public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer 
> value) {
>               // TODO Auto-generated method stub
>               if (key == null) {
>                       return deserialize(value);
>               }
>               Object keytuple = deserialize(key).get(0);
>               Object valuetuple = deserialize(value).get(0);
>               return new Values(ImmutableMap.of(keytuple, valuetuple));
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to