Robert Joseph Evans created STORM-2225:
------------------------------------------

             Summary: Kafka New API make simple things simple
                 Key: STORM-2225
                 URL: https://issues.apache.org/jira/browse/STORM-2225
             Project: Apache Storm
          Issue Type: Improvement
          Components: storm-kafka-client
    Affects Versions: 1.0.0, 2.0.0
            Reporter: Robert Joseph Evans
            Assignee: Robert Joseph Evans


The Kafka spouts in storm-kafka-client use the new API and are very extendable, 
but doing very simple things take way too many lines of code.

For example to create a KafkaTridentSpoutOpaque you need the following code 
(from the example).

{code}
    private KafkaTridentSpoutOpaque<String, String> 
newKafkaTridentSpoutOpaque() {
        return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>(
                        newKafkaSpoutConfig(
                        newKafkaSpoutStreams())));
    }

    private KafkaSpoutConfig<String,String> 
newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
        return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
                    kafkaSpoutStreams, newTuplesBuilder(), newRetryService())
                .setOffsetCommitPeriodMs(10_000)
                .setFirstPollOffsetStrategy(EARLIEST)
                .setMaxUncommittedOffsets(250)
                .build();
    }

    protected Map<String,Object> newKafkaConsumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, 
"127.0.0.1:9092");
        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.partition.fetch.bytes", 200);
        return props;
    }

    protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() {
        return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
                new TopicsTupleBuilder<String, String>(TOPIC_1, TOPIC_2))
                .build();
    }

    protected KafkaSpoutRetryService newRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(new 
KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
                Integer.MAX_VALUE, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
    }

    protected KafkaSpoutStreams newKafkaSpoutStreams() {
        return new KafkaSpoutStreamsNamedTopics.Builder(new Fields("str"), new 
String[]{"test-trident","test-trident-1"}).build();
    }

    protected static class TopicsTupleBuilder<K, V> extends 
KafkaSpoutTupleBuilder<K,V> {
        public TopicsTupleBuilder(String... topics) {
            super(topics);
        }
        @Override
        public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
            return new Values(consumerRecord.value());
        }
    }
{code}

All of this so I can have a trident spout that reads <String, String> values 
from "localhost:9092" on the topics "test-trident" and "test-trident-1" and 
outputting the value as the field "str".

I shouldn't need 50 lines of code for something I can explain in 3 lines of 
test.  It feels like we need to have some better defaults, and less overhead on 
a lot of these things.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to