[ 
https://issues.apache.org/jira/browse/STORM-826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14601226#comment-14601226
 ] 

ASF GitHub Bot commented on STORM-826:
--------------------------------------

Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/storm/pull/572#discussion_r33256725
  
    --- Diff: external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java 
---
    @@ -126,21 +169,33 @@ public void executeWithByteArrayKeyAndMessage() {
         private KafkaBolt generateStringSerializerBolt() {
             KafkaBolt bolt = new KafkaBolt();
             Properties props = new Properties();
    -        props.put("metadata.broker.list", 
broker.getBrokerConnectionString());
             props.put("request.required.acks", "1");
             props.put("serializer.class", "kafka.serializer.StringEncoder");
    +        props.put("bootstrap.servers", broker.getBrokerConnectionString());
    +        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
    +        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
    +        props.put("metadata.fetch.timeout.ms", 1000);
             config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
             bolt.prepare(config, null, new OutputCollector(collector));
    +        bolt.setAsync(false);
             return bolt;
         }
     
    -    private KafkaBolt generateDefaultSerializerBolt() {
    +    private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean 
fireAndForget) {
             KafkaBolt bolt = new KafkaBolt();
             Properties props = new Properties();
    -        props.put("metadata.broker.list", 
broker.getBrokerConnectionString());
             props.put("request.required.acks", "1");
    +        props.put("serializer.class", "kafka.serializer.StringEncoder");
    +        props.put("bootstrap.servers", broker.getBrokerConnectionString());
    +        props.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
    +        props.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
    +        props.put("metadata.fetch.timeout.ms", 1000);
    +        props.put("batch.size", 1);
    --- End diff --
    
    batch size is in bytes, 1 seems unnecessarily small.


> As a storm developer I’d like to use the new kafka producer API to reduce 
> dependencies and use long term supported kafka apis 
> ------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: STORM-826
>                 URL: https://issues.apache.org/jira/browse/STORM-826
>             Project: Apache Storm
>          Issue Type: Story
>          Components: storm-kafka
>            Reporter: Thomas Becker
>            Assignee: Zhuo Liu
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to