[ 
https://issues.apache.org/jira/browse/KAFKA-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-3:
---------------------------

    Attachment: KAFKA-3_v1.patch


Here is a first stab at plugging in a decoder for the consumer API.  The
html docs/quickstart also need to be updated after this is finalized, so
this is more of a checkpoint review request.

The consumer API becomes a bit clunky with the parameterized type so if
anyone has ideas on making this simpler/cleaner please comment.

There is an example of the API in action in the new test under
ZookeeperConsumerConnectorTest, but here is a summary:

// create consumerConfig and set config.deserializerClass to
// MyDecoder (extends Decoder<MyRichType>)

val consumer = new ZookeeperConsumerConnector[MyRichType](consumerConfig)
val streams = consumer.createMessageStreams(topicMap) // each stream is of type 
KafkaMessageStream[MyRichType]
for (stream : streams) {
    for (myRichTypeMessage <- stream) {
        // use myRichTypeMessage: MyRichType
    }
}

- With this approach, createMessageStreams can handle at most one decoder
  class. If the client needs different decoders for different topics, the
  client will need to create separate consumer connectors with a different
  deserializer for each one. One possible way to avoid that would be to make
  the deserializer config a topic:deserializer map, and move the type
  parameter to the createMessageStreams method. The client will still need
  to call createMessageStreams for each deserializer type, but will not need
  multiple connectors objects - not sure if this adds much value though and
  it makes the config more cumbersome.

- For ConsoleConsumer, the MessageFormatter cannot be parameterized from the
  config. So that poses a bit of a redundancy problem in that the formatter
  presumably needs to decode the message. I guess you can have a decoder
  implementation that implements both the decoder and formatter traits, and
  have the formatter implementation use the decoder if necessary.

Since this is a client API change, it would be great to discuss and resolve
any concerns about it at this point - should we post this comment on
kafka-user as well?


> Consumer needs a pluggable decoder
> ----------------------------------
>
>                 Key: KAFKA-3
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3
>             Project: Kafka
>          Issue Type: Improvement
>         Attachments: KAFKA-3_v1.patch
>
>
> Kafka producer allows a user to plug in an encoder (from type T to Message). 
> We need to do the same thing on the consumer side, by allowing a user to plug 
> in a decoder (from Message to type T).

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to