[ 
https://issues.apache.org/jira/browse/STORM-2225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15740854#comment-15740854
 ] 

Hugo Louro commented on STORM-2225:
-----------------------------------

[~revans2] are you planning on backporting these changes for 1.x-branch?

The reason I am asking is because there are some folks that are using the 
implementation prior to your proposed changes in production already.  

Thanks

> Kafka New API make simple things simple
> ---------------------------------------
>
>                 Key: STORM-2225
>                 URL: https://issues.apache.org/jira/browse/STORM-2225
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-kafka-client
>    Affects Versions: 1.0.0, 2.0.0
>            Reporter: Robert Joseph Evans
>            Assignee: Robert Joseph Evans
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> The Kafka spouts in storm-kafka-client use the new API and are very 
> extendable, but doing very simple things take way too many lines of code.
> For example to create a KafkaTridentSpoutOpaque you need the following code 
> (from the example).
> {code}
>     private KafkaTridentSpoutOpaque<String, String> 
> newKafkaTridentSpoutOpaque() {
>         return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>(
>                         newKafkaSpoutConfig(
>                         newKafkaSpoutStreams())));
>     }
>     private KafkaSpoutConfig<String,String> 
> newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
>         return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
>                     kafkaSpoutStreams, newTuplesBuilder(), newRetryService())
>                 .setOffsetCommitPeriodMs(10_000)
>                 .setFirstPollOffsetStrategy(EARLIEST)
>                 .setMaxUncommittedOffsets(250)
>                 .build();
>     }
>     protected Map<String,Object> newKafkaConsumerProps() {
>         Map<String, Object> props = new HashMap<>();
>         props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, 
> "127.0.0.1:9092");
>         props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
>         props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("max.partition.fetch.bytes", 200);
>         return props;
>     }
>     protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() {
>         return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
>                 new TopicsTupleBuilder<String, String>(TOPIC_1, TOPIC_2))
>                 .build();
>     }
>     protected KafkaSpoutRetryService newRetryService() {
>         return new KafkaSpoutRetryExponentialBackoff(new 
> KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
>                 
> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
>                 Integer.MAX_VALUE, 
> KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
>     }
>     protected KafkaSpoutStreams newKafkaSpoutStreams() {
>         return new KafkaSpoutStreamsNamedTopics.Builder(new Fields("str"), 
> new String[]{"test-trident","test-trident-1"}).build();
>     }
>     protected static class TopicsTupleBuilder<K, V> extends 
> KafkaSpoutTupleBuilder<K,V> {
>         public TopicsTupleBuilder(String... topics) {
>             super(topics);
>         }
>         @Override
>         public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
>             return new Values(consumerRecord.value());
>         }
>     }
> {code}
> All of this so I can have a trident spout that reads <String, String> values 
> from "localhost:9092" on the topics "test-trident" and "test-trident-1" and 
> outputting the value as the field "str".
> I shouldn't need 50 lines of code for something I can explain in 3 lines of 
> test.  It feels like we need to have some better defaults, and less overhead 
> on a lot of these things.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to