Robert Joseph Evans created STORM-2225: ------------------------------------------
Summary: Kafka New API make simple things simple Key: STORM-2225 URL: https://issues.apache.org/jira/browse/STORM-2225 Project: Apache Storm Issue Type: Improvement Components: storm-kafka-client Affects Versions: 1.0.0, 2.0.0 Reporter: Robert Joseph Evans Assignee: Robert Joseph Evans The Kafka spouts in storm-kafka-client use the new API and are very extendable, but doing very simple things take way too many lines of code. For example to create a KafkaTridentSpoutOpaque you need the following code (from the example). {code} private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque() { return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>( newKafkaSpoutConfig( newKafkaSpoutStreams()))); } private KafkaSpoutConfig<String,String> newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(), kafkaSpoutStreams, newTuplesBuilder(), newRetryService()) .setOffsetCommitPeriodMs(10_000) .setFirstPollOffsetStrategy(EARLIEST) .setMaxUncommittedOffsets(250) .build(); } protected Map<String,Object> newKafkaConsumerProps() { Map<String, Object> props = new HashMap<>(); props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092"); props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup"); props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); props.put("max.partition.fetch.bytes", 200); return props; } protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() { return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( new TopicsTupleBuilder<String, String>(TOPIC_1, TOPIC_2)) .build(); } protected KafkaSpoutRetryService newRetryService() { return new KafkaSpoutRetryExponentialBackoff(new KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)); } protected KafkaSpoutStreams newKafkaSpoutStreams() { return new KafkaSpoutStreamsNamedTopics.Builder(new Fields("str"), new String[]{"test-trident","test-trident-1"}).build(); } protected static class TopicsTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> { public TopicsTupleBuilder(String... topics) { super(topics); } @Override public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { return new Values(consumerRecord.value()); } } {code} All of this so I can have a trident spout that reads <String, String> values from "localhost:9092" on the topics "test-trident" and "test-trident-1" and outputting the value as the field "str". I shouldn't need 50 lines of code for something I can explain in 3 lines of test. It feels like we need to have some better defaults, and less overhead on a lot of these things. -- This message was sent by Atlassian JIRA (v6.3.4#6332)