[ https://issues.apache.org/jira/browse/STORM-2225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15740854#comment-15740854 ]
Hugo Louro commented on STORM-2225: ----------------------------------- [~revans2] are you planning on backporting these changes for 1.x-branch? The reason I am asking is because there are some folks that are using the implementation prior to your proposed changes in production already. Thanks > Kafka New API make simple things simple > --------------------------------------- > > Key: STORM-2225 > URL: https://issues.apache.org/jira/browse/STORM-2225 > Project: Apache Storm > Issue Type: Improvement > Components: storm-kafka-client > Affects Versions: 1.0.0, 2.0.0 > Reporter: Robert Joseph Evans > Assignee: Robert Joseph Evans > Time Spent: 4h 40m > Remaining Estimate: 0h > > The Kafka spouts in storm-kafka-client use the new API and are very > extendable, but doing very simple things take way too many lines of code. > For example to create a KafkaTridentSpoutOpaque you need the following code > (from the example). > {code} > private KafkaTridentSpoutOpaque<String, String> > newKafkaTridentSpoutOpaque() { > return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>( > newKafkaSpoutConfig( > newKafkaSpoutStreams()))); > } > private KafkaSpoutConfig<String,String> > newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { > return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(), > kafkaSpoutStreams, newTuplesBuilder(), newRetryService()) > .setOffsetCommitPeriodMs(10_000) > .setFirstPollOffsetStrategy(EARLIEST) > .setMaxUncommittedOffsets(250) > .build(); > } > protected Map<String,Object> newKafkaConsumerProps() { > Map<String, Object> props = new HashMap<>(); > props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, > "127.0.0.1:9092"); > props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup"); > props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("max.partition.fetch.bytes", 200); > return props; > } > protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() { > return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( > new TopicsTupleBuilder<String, String>(TOPIC_1, TOPIC_2)) > .build(); > } > protected KafkaSpoutRetryService newRetryService() { > return new KafkaSpoutRetryExponentialBackoff(new > KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS), > > KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), > Integer.MAX_VALUE, > KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)); > } > protected KafkaSpoutStreams newKafkaSpoutStreams() { > return new KafkaSpoutStreamsNamedTopics.Builder(new Fields("str"), > new String[]{"test-trident","test-trident-1"}).build(); > } > protected static class TopicsTupleBuilder<K, V> extends > KafkaSpoutTupleBuilder<K,V> { > public TopicsTupleBuilder(String... topics) { > super(topics); > } > @Override > public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { > return new Values(consumerRecord.value()); > } > } > {code} > All of this so I can have a trident spout that reads <String, String> values > from "localhost:9092" on the topics "test-trident" and "test-trident-1" and > outputting the value as the field "str". > I shouldn't need 50 lines of code for something I can explain in 3 lines of > test. It feels like we need to have some better defaults, and less overhead > on a lot of these things. -- This message was sent by Atlassian JIRA (v6.3.4#6332)