[
https://issues.apache.org/jira/browse/KAFKA-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Joel Koshy updated KAFKA-3:
---------------------------
Attachment: KAFKA-3_v1.patch
Here is a first stab at plugging in a decoder for the consumer API. The
html docs/quickstart also need to be updated after this is finalized, so
this is more of a checkpoint review request.
The consumer API becomes a bit clunky with the parameterized type so if
anyone has ideas on making this simpler/cleaner please comment.
There is an example of the API in action in the new test under
ZookeeperConsumerConnectorTest, but here is a summary:
// create consumerConfig and set config.deserializerClass to
// MyDecoder (extends Decoder<MyRichType>)
val consumer = new ZookeeperConsumerConnector[MyRichType](consumerConfig)
val streams = consumer.createMessageStreams(topicMap) // each stream is of type
KafkaMessageStream[MyRichType]
for (stream : streams) {
for (myRichTypeMessage <- stream) {
// use myRichTypeMessage: MyRichType
}
}
- With this approach, createMessageStreams can handle at most one decoder
class. If the client needs different decoders for different topics, the
client will need to create separate consumer connectors with a different
deserializer for each one. One possible way to avoid that would be to make
the deserializer config a topic:deserializer map, and move the type
parameter to the createMessageStreams method. The client will still need
to call createMessageStreams for each deserializer type, but will not need
multiple connectors objects - not sure if this adds much value though and
it makes the config more cumbersome.
- For ConsoleConsumer, the MessageFormatter cannot be parameterized from the
config. So that poses a bit of a redundancy problem in that the formatter
presumably needs to decode the message. I guess you can have a decoder
implementation that implements both the decoder and formatter traits, and
have the formatter implementation use the decoder if necessary.
Since this is a client API change, it would be great to discuss and resolve
any concerns about it at this point - should we post this comment on
kafka-user as well?
> Consumer needs a pluggable decoder
> ----------------------------------
>
> Key: KAFKA-3
> URL: https://issues.apache.org/jira/browse/KAFKA-3
> Project: Kafka
> Issue Type: Improvement
> Attachments: KAFKA-3_v1.patch
>
>
> Kafka producer allows a user to plug in an encoder (from type T to Message).
> We need to do the same thing on the consumer side, by allowing a user to plug
> in a decoder (from Message to type T).
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira